Skip to content

Commit 20c2fd5

Browse files
authored
[connector/datadog] - Use waitgroup instead of closing a channel and waiting on it (#40849)
#### Description The `run()` goroutine closes the channel in defer. There are two sceanrios: 1. Best case scenario: no error encountered while calling `ConsumeMetrics`. 2. Worst case scenario: we encounter an error while calling `ConsumeMetrics`, causing the goroutine to exit early and as a result, close the `c.exit` chan. We close the channel, again, in `Shutdown` and this results in a "send on closed channel" panic in worst case scenario. In best case scenario, no panic is encountered. This PR adds waitgroup to avoid the panic. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #40845 <!--Describe what testing was performed and which tests were added.--> #### Testing Added a test case
1 parent c95fb83 commit 20c2fd5

File tree

3 files changed

+50
-3
lines changed

3 files changed

+50
-3
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: datadogconnector
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Use proper syncronization to fix a collector panic when an error occurs
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40845]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

connector/datadogconnector/connector_native.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package datadogconnector // import "github.com/open-telemetry/opentelemetry-coll
66
import (
77
"context"
88
"fmt"
9+
"sync"
910
"time"
1011

1112
"github.com/DataDog/datadog-agent/comp/otelcol/otlp/components/statsprocessor"
@@ -29,6 +30,8 @@ type traceToMetricConnectorNative struct {
2930
metricsConsumer consumer.Metrics // the next component in the pipeline to ingest metrics after connector
3031
logger *zap.Logger
3132

33+
wg sync.WaitGroup
34+
3235
// concentrator ingests spans and produces APM stats
3336
concentrator *stats.Concentrator
3437

@@ -100,6 +103,7 @@ func newTraceToMetricConnectorNative(set component.TelemetrySettings, cfg compon
100103
func (c *traceToMetricConnectorNative) Start(_ context.Context, _ component.Host) error {
101104
c.logger.Info("Starting datadogconnector")
102105
c.concentrator.Start()
106+
c.wg.Add(1)
103107
go c.run()
104108
c.isStarted = true
105109
return nil
@@ -117,8 +121,8 @@ func (c *traceToMetricConnectorNative) Shutdown(context.Context) error {
117121
// stop the obfuscator and concentrator and wait for the run loop to exit
118122
c.obfuscator.Stop()
119123
c.concentrator.Stop()
120-
c.exit <- struct{}{} // signal exit
121-
<-c.exit // wait for close
124+
close(c.exit)
125+
c.wg.Wait()
122126
return nil
123127
}
124128

@@ -139,7 +143,7 @@ func (c *traceToMetricConnectorNative) ConsumeTraces(_ context.Context, traces p
139143
// run awaits incoming stats resulting from the agent's ingestion, converts them
140144
// to metrics and flushes them using the configured metrics exporter.
141145
func (c *traceToMetricConnectorNative) run() {
142-
defer close(c.exit)
146+
defer c.wg.Done()
143147
for {
144148
select {
145149
case stats := <-c.statsout:

connector/datadogconnector/connector_native_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package datadogconnector
55

66
import (
77
"context"
8+
"errors"
89
"sort"
910
"testing"
1011
"time"
@@ -357,3 +358,18 @@ func TestObfuscate(t *testing.T) {
357358
t.Errorf("Diff between APM stats -want +got:\n%v", diff)
358359
}
359360
}
361+
362+
func TestNoPanic(t *testing.T) {
363+
c, _ := creteConnectorNative(t)
364+
c.metricsConsumer = consumertest.NewErr(errors.New("error"))
365+
require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost()))
366+
trace1 := generateTrace()
367+
368+
err := c.ConsumeTraces(context.Background(), trace1)
369+
assert.NoError(t, err)
370+
371+
time.Sleep(2 * time.Second)
372+
373+
err = c.Shutdown(context.Background())
374+
require.NoError(t, err)
375+
}

0 commit comments

Comments
 (0)