Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ea784b2
emit spans for opamp supervisor
bacherfl Mar 18, 2025
e8ae992
pass through context to message handling functions to emit spans
bacherfl Mar 19, 2025
f681353
Merge branch 'main' into feat/38724/emit-spans
bacherfl Mar 19, 2025
a4596a2
add e2e tests for emitting spans
bacherfl Mar 19, 2025
1123c7c
add spans for handling messages from agent
bacherfl Mar 19, 2025
3ba75fd
fix linting
bacherfl Mar 19, 2025
567ec3e
add noop telemetry provider to supervisor tests
bacherfl Mar 20, 2025
e809a47
fix linting
bacherfl Mar 20, 2025
2aff260
fix tests
bacherfl Mar 20, 2025
ed4c2de
remove ineffectual assignments
bacherfl Mar 20, 2025
b934e0d
remove ineffectual assignments
bacherfl Mar 20, 2025
7f0557a
Merge branch 'main' into feat/38724/emit-spans
bacherfl Mar 21, 2025
12eb2b4
add changelog entry
bacherfl Mar 26, 2025
0d0f410
Merge branch 'main' into feat/38724/emit-spans
bacherfl Mar 26, 2025
11683d7
adapt span names to use the same name as the functions
bacherfl Apr 1, 2025
cb53cc9
Merge branch 'main' into feat/38724/emit-spans
bacherfl Apr 1, 2025
40a08e8
adapt expected span names
bacherfl Apr 1, 2025
c50a7eb
Merge branch 'main' into feat/38724/emit-spans
bacherfl Apr 2, 2025
fa74b7d
Merge branch 'main' into feat/38724/emit-spans
bacherfl Apr 3, 2025
e41df4d
Merge branch 'main' into feat/38724/emit-spans
bacherfl Apr 4, 2025
7e1c526
Merge branch 'main' into feat/38724/emit-spans
bacherfl Apr 7, 2025
d50ec25
Merge branch 'main' into feat/38724/emit-spans
bacherfl Apr 8, 2025
cfeb50b
Merge branch 'main' into feat/38724/emit-spans
bacherfl Apr 14, 2025
f2adf30
Merge branch 'main' into feat/38724/emit-spans
bacherfl Apr 28, 2025
6a5a4fe
Merge branch 'main' into feat/38724/emit-spans
bacherfl May 8, 2025
1934181
Merge branch 'main' into feat/38724/emit-spans
bacherfl May 22, 2025
0ad3720
reduce emitted spans to handle messages from agent and from server
bacherfl May 23, 2025
35761ce
reduce emitted spans to handle messages from agent and from server
bacherfl May 23, 2025
4959459
Merge branch 'main' into feat/38724/emit-spans
bacherfl May 26, 2025
ea01515
adapt e2e tests to removed spans
bacherfl Jun 4, 2025
4f5ba9f
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jun 5, 2025
8e772b2
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jun 24, 2025
0a98d04
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jul 7, 2025
3ced0a5
fix failing tests
bacherfl Jul 7, 2025
2926bd1
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jul 8, 2025
e6018ab
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jul 8, 2025
5e037ce
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jul 10, 2025
214fd51
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jul 21, 2025
7be2d8e
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jul 24, 2025
18cfccc
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jul 28, 2025
7e199f5
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jul 28, 2025
1e68092
Merge branch 'main' into feat/38724/emit-spans
bacherfl Jul 29, 2025
0700568
Merge branch 'main' into feat/38724/emit-spans
bacherfl Aug 1, 2025
274a9c4
Merge branch 'main' into feat/38724/emit-spans
bacherfl Aug 4, 2025
fb31775
trigger CI tests
bacherfl Aug 4, 2025
e651bb2
Merge branch 'main' into feat/38724/emit-spans
bacherfl Aug 5, 2025
99fdcfb
Merge branch 'main' into feat/38724/emit-spans
bacherfl Aug 26, 2025
13330d9
Merge branch 'main' into feat/38724/emit-spans
bacherfl Aug 26, 2025
341fc8c
Apply suggestions from code review
evan-bradley Aug 29, 2025
b360920
Merge branch 'main' into feat/38724/emit-spans
evan-bradley Aug 29, 2025
1a9dba3
Merge branch 'main' into feat/38724/emit-spans
evan-bradley Aug 29, 2025
3bd9b93
Fix lint
evan-bradley Aug 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/opamp-supervisor-emit-spans.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Emit spans for handling OpAMP messages

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [38724]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
45 changes: 38 additions & 7 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2078,9 +2078,17 @@ func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
},
})

outputPath := filepath.Join(t.TempDir(), "output.txt")
backend := testbed.NewOTLPHTTPDataReceiver(4318)
mockBackend := testbed.NewMockBackend(outputPath, backend)
mockBackend.EnableRecording()
defer mockBackend.Stop()
require.NoError(t, mockBackend.Start())

extraConfigData := map[string]string{
"url": server.addr,
"config_apply_timeout": "3s",
"telemetryUrl": "localhost:4318",
}
if mode.UseHUPConfigReload {
extraConfigData["use_hup_config_reload"] = "true"
Expand Down Expand Up @@ -2199,6 +2207,20 @@ func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
status, ok := remoteConfigStatus.Load().(*protobufs.RemoteConfigStatus)
return ok && status.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED
}, 5*time.Second, 10*time.Millisecond, "Remote config status was not set to APPLIED for empty config")

gotSpans := []string{}
expectedSpans := []string{"GetBootstrapInfo", "onMessage"}
require.EventuallyWithT(t, func(collect *assert.CollectT) {
require.GreaterOrEqual(collect, len(mockBackend.ReceivedTraces), len(expectedSpans))
}, 10*time.Second, 250*time.Millisecond)

for i := 0; i < len(mockBackend.ReceivedTraces); i++ {
gotSpans = append(gotSpans, mockBackend.ReceivedTraces[i].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
}

for _, expectedSpan := range expectedSpans {
require.Contains(t, gotSpans, expectedSpan)
}
})
})
}
Expand Down Expand Up @@ -2406,8 +2428,6 @@ func TestSupervisorEmitBootstrapTelemetry(t *testing.T) {
})

outputPath := filepath.Join(t.TempDir(), "output.txt")
_, err = findRandomPort()
require.Nil(t, err)
backend := testbed.NewOTLPHTTPDataReceiver(4318)
mockBackend := testbed.NewMockBackend(outputPath, backend)
mockBackend.EnableRecording()
Expand Down Expand Up @@ -2450,17 +2470,28 @@ func TestSupervisorEmitBootstrapTelemetry(t *testing.T) {
return agentName == command && agentVersion == version
}, 5*time.Second, 250*time.Millisecond)

expectedSpans := []string{"GetBootstrapInfo"}

require.EventuallyWithT(t, func(collect *assert.CollectT) {
require.Len(collect, mockBackend.ReceivedTraces, 1)
require.GreaterOrEqual(collect, len(mockBackend.ReceivedTraces), len(expectedSpans))
}, 10*time.Second, 250*time.Millisecond)

require.Equal(t, 1, mockBackend.ReceivedTraces[0].ResourceSpans().Len())
gotServiceName, ok := mockBackend.ReceivedTraces[0].ResourceSpans().At(0).Resource().Attributes().Get(string(semconv.ServiceNameKey))
require.True(t, ok)
require.Equal(t, "opamp-supervisor", gotServiceName.Str())

require.Equal(t, 1, mockBackend.ReceivedTraces[0].ResourceSpans().At(0).ScopeSpans().Len())
require.Equal(t, 1, mockBackend.ReceivedTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len())
require.Equal(t, "GetBootstrapInfo", mockBackend.ReceivedTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
require.Equal(t, ptrace.StatusCodeOk, mockBackend.ReceivedTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().Code())
for _, expectedSpan := range expectedSpans {
gotSpan := false
for i := 0; i < len(mockBackend.ReceivedTraces); i++ {
require.Equal(t, 1, mockBackend.ReceivedTraces[i].ResourceSpans().At(0).ScopeSpans().Len())
require.Equal(t, 1, mockBackend.ReceivedTraces[i].ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len())
if mockBackend.ReceivedTraces[i].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name() != expectedSpan {
continue
}
gotSpan = true
require.Equal(t, ptrace.StatusCodeOk, mockBackend.ReceivedTraces[i].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().Code())
}
require.Truef(t, gotSpan, "expected to find span '%s', but did not find it", expectedSpan)
}
}
34 changes: 29 additions & 5 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/contrib/bridges/otelzap"
telemetryconfig "go.opentelemetry.io/contrib/otelconf/v0.3.0"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/log"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
Expand Down Expand Up @@ -324,7 +325,6 @@ func (s *Supervisor) Start() error {
if err != nil {
return err
}

if err = s.getFeatureGates(); err != nil {
return fmt.Errorf("could not get feature gates from the Collector: %w", err)
}
Expand Down Expand Up @@ -439,6 +439,7 @@ func (s *Supervisor) createTemplates() error {
func (s *Supervisor) getBootstrapInfo() (err error) {
_, span := s.getTracer().Start(context.Background(), "GetBootstrapInfo")
defer span.End()

s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not get supervisor opamp service port: %v", err))
Expand Down Expand Up @@ -817,6 +818,9 @@ func (s *Supervisor) startOpAMPServer() error {
}

func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
ctx, span := s.getTracer().Start(context.Background(), "handleAgentOpAMPMessage")
defer span.End()

s.agentConn.Store(conn)

s.telemetrySettings.Logger.Debug("Received OpAMP message from the agent")
Expand All @@ -825,22 +829,28 @@ func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, messag
}

if message.EffectiveConfig != nil {
span.AddEvent("Received effectiveConfig")
if cfg, ok := message.EffectiveConfig.GetConfigMap().GetConfigMap()[""]; ok {
s.telemetrySettings.Logger.Debug("Received effective config from agent")
s.effectiveConfig.Store(string(cfg.Body))
err := s.opampClient.UpdateEffectiveConfig(context.Background())
err := s.opampClient.UpdateEffectiveConfig(ctx)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not update effective config: %s", err.Error()))
s.telemetrySettings.Logger.Error("The OpAMP client failed to update the effective config", zap.Error(err))
}
} else {
s.telemetrySettings.Logger.Error("Got effective config message, but the instance config was not present. Ignoring effective config.")
msg := "Got effective config message, but the instance config was not present. Ignoring effective config."
span.SetStatus(codes.Error, msg)
s.telemetrySettings.Logger.Error(msg)
}
}

// Proxy client capabilities to server
if message.CustomCapabilities != nil {
span.AddEvent("Received customCapabilities")
err := s.opampClient.SetCustomCapabilities(message.CustomCapabilities)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Failed to send custom capabilities to OpAMP server: %s", err.Error()))
s.telemetrySettings.Logger.Error("Failed to send custom capabilities to OpAMP server")
}
}
Expand All @@ -859,6 +869,10 @@ func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, messag
}

if message.Health != nil {
span.AddEvent("Received health", trace.WithAttributes(attribute.KeyValue{
Key: "health",
Value: attribute.BoolValue(message.Health.Healthy),
}))
s.telemetrySettings.Logger.Debug("Received health status from agent", zap.Bool("healthy", message.Health.Healthy))
s.lastHealthFromClient.Store(message.Health)
err := s.SetHealth(message.Health)
Expand Down Expand Up @@ -1847,14 +1861,16 @@ func (s *Supervisor) SetHealth(componentHealth *protobufs.ComponentHealth) error
}

func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {
ctx, span := s.getTracer().Start(ctx, "onMessage")
defer span.End()
configChanged := false

if msg.AgentIdentification != nil {
configChanged = s.processAgentIdentificationMessage(msg.AgentIdentification) || configChanged
}

if msg.RemoteConfig != nil {
configChanged = s.processRemoteConfigMessage(msg.RemoteConfig) || configChanged
configChanged = s.processRemoteConfigMessage(ctx, msg.RemoteConfig) || configChanged
}

if msg.OwnMetricsConnSettings != nil || msg.OwnTracesConnSettings != nil || msg.OwnLogsConnSettings != nil {
Expand All @@ -1867,6 +1883,7 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {

// Update the agent config if any messages have touched the config
if configChanged {
span.AddEvent("Config changed")
err := s.opampClient.UpdateEffectiveConfig(ctx)
if err != nil {
s.telemetrySettings.Logger.Error("The OpAMP client failed to update the effective config", zap.Error(err))
Expand Down Expand Up @@ -1906,16 +1923,21 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) {
}
}
}

span.SetStatus(codes.Ok, "")
}

// processRemoteConfigMessage processes an AgentRemoteConfig message, returning true if the agent config has changed.
func (s *Supervisor) processRemoteConfigMessage(msg *protobufs.AgentRemoteConfig) bool {
func (s *Supervisor) processRemoteConfigMessage(ctx context.Context, msg *protobufs.AgentRemoteConfig) bool {
_, span := s.getTracer().Start(ctx, "processRemoteConfigMessage")
defer span.End()
if !s.config.Capabilities.AcceptsRemoteConfig {
s.telemetrySettings.Logger.Warn("Got remote config message, but the agent does not accept remote config. Ignoring remote config.")
return false
}

if err := s.saveLastReceivedConfig(msg); err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Could not save last received remote config: %s", err.Error()))
s.telemetrySettings.Logger.Error("Could not save last received remote config", zap.Error(err))
}

Expand All @@ -1925,6 +1947,7 @@ func (s *Supervisor) processRemoteConfigMessage(msg *protobufs.AgentRemoteConfig
var err error
configChanged, err := s.composeMergedConfig(s.remoteConfig)
if err != nil {
span.SetStatus(codes.Error, fmt.Sprintf("Error composing merged config. Reporting failed remote config status: %s", err.Error()))
s.telemetrySettings.Logger.Error("Error composing merged config. Reporting failed remote config status.", zap.Error(err))
s.saveAndReportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, err.Error())
}
Expand All @@ -1933,6 +1956,7 @@ func (s *Supervisor) processRemoteConfigMessage(msg *protobufs.AgentRemoteConfig
s.saveAndReportConfigStatus(protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLYING, "")
}

span.SetStatus(codes.Ok, "")
return configChanged
}

Expand Down
19 changes: 12 additions & 7 deletions cmd/opampsupervisor/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -137,7 +138,8 @@ func setupSupervisorConfig(t *testing.T, configuration string) config.Supervisor
func newNopTelemetrySettings() telemetrySettings {
return telemetrySettings{
TelemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
Logger: zap.NewNop(),
TracerProvider: noop.NewTracerProvider(),
},
}
}
Expand Down Expand Up @@ -1587,17 +1589,19 @@ service:
func TestSupervisor_createEffectiveConfigMsg(t *testing.T) {
t.Run("empty config", func(t *testing.T) {
s := Supervisor{
effectiveConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
cfgState: &atomic.Value{},
telemetrySettings: newNopTelemetrySettings(),
}
got := s.createEffectiveConfigMsg()

assert.Empty(t, got.ConfigMap.ConfigMap[""].Body)
})
t.Run("effective and merged config set - prefer effective config", func(t *testing.T) {
s := Supervisor{
effectiveConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
cfgState: &atomic.Value{},
telemetrySettings: newNopTelemetrySettings(),
}

s.effectiveConfig.Store("effective")
Expand All @@ -1609,8 +1613,9 @@ func TestSupervisor_createEffectiveConfigMsg(t *testing.T) {
})
t.Run("only merged config set", func(t *testing.T) {
s := Supervisor{
effectiveConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
cfgState: &atomic.Value{},
telemetrySettings: newNopTelemetrySettings(),
}

s.cfgState.Store(&configState{mergedConfig: "merged"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,20 @@ agent:
{{- if .use_hup_config_reload }}
use_hup_config_reload: {{ .use_hup_config_reload }}
{{- end }}

telemetry:
traces:
processors:
- simple:
exporter:
otlp:
protocol: http/protobuf
endpoint: http://{{.telemetryUrl}}
logs:
level: debug
processors:
- simple:
exporter:
otlp:
protocol: http/protobuf
endpoint: http://{{.telemetryUrl}}
Loading