Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions internal/healthcheck/internal/http/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ type serializableStatus struct {

// SerializableEvent is exported for json.Unmarshal
type SerializableEvent struct {
Healthy bool `json:"healthy"`
StatusString string `json:"status"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"status_time"`
Healthy bool `json:"healthy"`
StatusString string `json:"status"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"status_time"`
Attributes map[string]any `json:"attributes"`
}

var stringToStatusMap = map[string]componentstatus.Status{
Expand All @@ -63,6 +64,7 @@ func toSerializableEvent(ev status.Event, isHealthy bool) *SerializableEvent {
Healthy: isHealthy,
StatusString: ev.Status().String(),
Timestamp: ev.Timestamp(),
Attributes: ev.Attributes().AsRaw(),
}
if ev.Err() != nil {
se.Error = ev.Err().Error()
Expand Down
1 change: 1 addition & 0 deletions internal/healthcheck/internal/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
healthy bool
status componentstatus.Status
err error
attributes map[string]any

Check failure on line 143 in internal/healthcheck/internal/http/server_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, internal)

field attributes is unused (unused)

Check failure on line 143 in internal/healthcheck/internal/http/server_test.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, internal)

field attributes is unused (unused)
nestedStatus map[string]*componentStatusExpectation
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/status/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ import (
"time"

"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/pdata/pcommon"
)

// statusEvent contains a status and timestamp, and can contain an error. Note:
// this is duplicated from core because we need to be able to "rewrite" the
// timestamps of some events during aggregation.
type statusEvent struct {
status componentstatus.Status
err error
timestamp time.Time
status componentstatus.Status
err error
timestamp time.Time
attributes pcommon.Map
}

var _ Event = (*statusEvent)(nil)
Expand All @@ -35,6 +37,11 @@ func (ev *statusEvent) Timestamp() time.Time {
return ev.timestamp
}

// Attributes returns the timestamp associated with the StatusEvent
func (ev *statusEvent) Attributes() pcommon.Map {
return ev.attributes
}

type ErrorPriority int

const (
Expand Down Expand Up @@ -149,8 +156,9 @@ func newAggregationFunc(priority ErrorPriority) aggregationFunc {

// the aggregate status requires a synthetic event
return &statusEvent{
status: status,
timestamp: lastEvent.Timestamp(),
status: status,
timestamp: lastEvent.Timestamp(),
attributes: lastEvent.Attributes(),
}
}
}
4 changes: 3 additions & 1 deletion pkg/status/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pipeline"
)

Expand All @@ -21,6 +22,7 @@ type Event interface {
Status() componentstatus.Status
Err() error
Timestamp() time.Time
Attributes() pcommon.Map
}

// Scope refers to a part of an AggregateStatus. The zero-value, aka ScopeAll,
Expand Down Expand Up @@ -96,7 +98,7 @@ type Aggregator struct {
func NewAggregator(errPriority ErrorPriority) *Aggregator {
return &Aggregator{
aggregateStatus: &AggregateStatus{
Event: &componentstatus.Event{},
Event: componentstatus.NewEvent(componentstatus.StatusNone),
ComponentStatusMap: make(map[string]*AggregateStatus),
},
subscriptions: make(map[string]*list.List),
Expand Down
26 changes: 22 additions & 4 deletions pkg/status/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pipeline"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
Expand All @@ -26,6 +27,7 @@ func TestAggregateStatus(t *testing.T) {
st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assert.Equal(t, componentstatus.StatusNone, st.Status())
assert.Equal(t, pcommon.NewMap(), st.Attributes())
})

testhelpers.SeedAggregator(agg, traces.InstanceIDs(), componentstatus.StatusOK)
Expand All @@ -47,21 +49,27 @@ func TestAggregateStatus(t *testing.T) {
assertErrorEventsMatch(t,
componentstatus.StatusRecoverableError,
assert.AnError,
pcommon.NewMap(),
st,
)
})

attrs := pcommon.NewMap()
attrs.PutStr("error.msg", "error cannot be recovered")
agg.RecordStatus(
traces.ExporterID,
componentstatus.NewPermanentErrorEvent(assert.AnError),
componentstatus.NewEvent(componentstatus.StatusPermanentError,
componentstatus.WithError(assert.AnError),
componentstatus.WithAttributes(attrs)),
)

t.Run("pipeline with permanent error", func(t *testing.T) {
t.Run("pipeline with permanent error and attributes", func(t *testing.T) {
st, ok := agg.AggregateStatus(status.ScopeAll, status.Concise)
require.True(t, ok)
assertErrorEventsMatch(t,
componentstatus.StatusPermanentError,
assert.AnError,
attrs,
st,
)
})
Expand Down Expand Up @@ -110,6 +118,7 @@ func TestAggregateStatusVerbose(t *testing.T) {
t,
componentstatus.StatusRecoverableError,
assert.AnError,
pcommon.NewMap(),
st,
st.ComponentStatusMap[tracesKey],
)
Expand All @@ -124,6 +133,7 @@ func TestAggregateStatusVerbose(t *testing.T) {
assertErrorEventsMatch(t,
componentstatus.StatusRecoverableError,
assert.AnError,
pcommon.NewMap(),
st.ComponentStatusMap[tracesKey].ComponentStatusMap[toComponentKey(traces.ExporterID)],
)
})
Expand Down Expand Up @@ -152,6 +162,7 @@ func TestAggregateStatusPriorityRecoverable(t *testing.T) {
assertErrorEventsMatch(t,
componentstatus.StatusPermanentError,
assert.AnError,
pcommon.NewMap(),
st,
)
})
Expand All @@ -167,6 +178,7 @@ func TestAggregateStatusPriorityRecoverable(t *testing.T) {
assertErrorEventsMatch(t,
componentstatus.StatusRecoverableError,
assert.AnError,
pcommon.NewMap(),
st,
)
})
Expand Down Expand Up @@ -204,7 +216,7 @@ func TestPipelineAggregateStatus(t *testing.T) {
status.Concise,
)
require.True(t, ok)
assertErrorEventsMatch(t, componentstatus.StatusRecoverableError, assert.AnError, st)
assertErrorEventsMatch(t, componentstatus.StatusRecoverableError, assert.AnError, pcommon.NewMap(), st)
})
}

Expand Down Expand Up @@ -238,7 +250,7 @@ func TestPipelineAggregateStatusVerbose(t *testing.T) {
require.True(t, ok)

// Top-level status matches
assertErrorEventsMatch(t, componentstatus.StatusRecoverableError, assert.AnError, st)
assertErrorEventsMatch(t, componentstatus.StatusRecoverableError, assert.AnError, pcommon.NewMap(), st)

// Component statuses match
assertEventsMatch(t,
Expand All @@ -248,6 +260,7 @@ func TestPipelineAggregateStatusVerbose(t *testing.T) {
assertErrorEventsMatch(t,
componentstatus.StatusRecoverableError,
assert.AnError,
pcommon.NewMap(),
st.ComponentStatusMap[toComponentKey(traces.ExporterID)],
)
})
Expand Down Expand Up @@ -279,6 +292,7 @@ func TestAggregateStatusExtensions(t *testing.T) {
assertErrorEventsMatch(t,
componentstatus.StatusRecoverableError,
assert.AnError,
pcommon.NewMap(),
st,
)
})
Expand Down Expand Up @@ -400,6 +414,7 @@ func TestStreamingVerbose(t *testing.T) {
assertErrorEventsMatch(t,
componentstatus.StatusRecoverableError,
assert.AnError,
pcommon.NewMap(),
st,
st.ComponentStatusMap[tracesKey],
)
Expand All @@ -414,6 +429,7 @@ func TestStreamingVerbose(t *testing.T) {
assertErrorEventsMatch(t,
componentstatus.StatusRecoverableError,
assert.AnError,
pcommon.NewMap(),
st.ComponentStatusMap[tracesKey].ComponentStatusMap[toComponentKey(traces.ExporterID)],
)
})
Expand Down Expand Up @@ -471,13 +487,15 @@ func assertErrorEventsMatch(
t *testing.T,
expectedStatus componentstatus.Status,
expectedErr error,
expectedAttrs pcommon.Map,
statuses ...*status.AggregateStatus,
) {
assert.True(t, componentstatus.StatusIsError(expectedStatus))
for _, st := range statuses {
ev := st.Event
assert.Equal(t, expectedStatus, ev.Status())
assert.Equal(t, expectedErr, ev.Err())
assert.True(t, expectedAttrs.Equal(ev.Attributes()))
}
}

Expand Down
Loading