Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/spanmetrics_exemplars_type_fields.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Sets TraceID and SpanID fields in Exemplar type (as per the spec) and removes the use of FilteredAttributes to pass these values around.

# One or more tracking issues related to the change
issues: [13401]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
16 changes: 10 additions & 6 deletions exporter/prometheusexporter/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,22 @@ func (c *collector) convertDoubleHistogram(metric pmetric.Metric, resourceAttrs

arrLen := ip.Exemplars().Len()
exemplars := make([]prometheus.Exemplar, arrLen)

for i := 0; i < arrLen; i++ {
e := ip.Exemplars().At(i)
exemplarLabels := make(prometheus.Labels, 0)

labels := make(prometheus.Labels, e.FilteredAttributes().Len())
e.FilteredAttributes().Range(func(k string, v pcommon.Value) bool {
labels[k] = v.AsString()
return true
})
if !e.TraceID().IsEmpty() {
exemplarLabels["trace_id"] = e.TraceID().HexString()
}

if !e.SpanID().IsEmpty() {
exemplarLabels["span_id"] = e.SpanID().HexString()
}

exemplars[i] = prometheus.Exemplar{
Value: e.DoubleValue(),
Labels: labels,
Labels: exemplarLabels,
Timestamp: e.Timestamp().AsTime(),
}
}
Expand Down
84 changes: 29 additions & 55 deletions exporter/prometheusexporter/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package prometheusexporter

import (
"encoding/hex"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -103,48 +104,33 @@ func TestConvertDoubleHistogramExemplar(t *testing.T) {
metric.SetUnit("T")

// initialize empty datapoint
hd := metric.SetEmptyHistogram().DataPoints().AppendEmpty()
histogramDataPoint := metric.SetEmptyHistogram().DataPoints().AppendEmpty()

hd.ExplicitBounds().FromRaw([]float64{5, 25, 90})
hd.BucketCounts().FromRaw([]uint64{2, 35, 70})
histogramDataPoint.ExplicitBounds().FromRaw([]float64{5, 25, 90})
histogramDataPoint.BucketCounts().FromRaw([]uint64{2, 35, 70})

exemplarTs, _ := time.Parse("unix", "Mon Jan _2 15:04:05 MST 2006")
exemplars := []prometheus.Exemplar{
{
Timestamp: exemplarTs,
Value: 3,
Labels: prometheus.Labels{"test_label_0": "label_value_0"},
},
{
Timestamp: exemplarTs,
Value: 50,
Labels: prometheus.Labels{"test_label_1": "label_value_1"},
},
{
Timestamp: exemplarTs,
Value: 78,
Labels: prometheus.Labels{"test_label_2": "label_value_2"},
},
{
Timestamp: exemplarTs,
Value: 100,
Labels: prometheus.Labels{"test_label_3": "label_value_3"},
},
}
// add test exemplar values to the metric
promExporterExemplars := histogramDataPoint.Exemplars().AppendEmpty()
var tBytes [16]byte
testTraceID, _ := hex.DecodeString("641d68e314a58152cc2581e7663435d1")
copy(tBytes[:], testTraceID)
traceID := pcommon.TraceID(tBytes)
promExporterExemplars.SetTraceID(traceID)

// add each exemplar value to the metric
for _, e := range exemplars {
pde := hd.Exemplars().AppendEmpty()
pde.SetDoubleValue(e.Value)
for k, v := range e.Labels {
pde.FilteredAttributes().PutStr(k, v)
}
pde.SetTimestamp(pcommon.NewTimestampFromTime(e.Timestamp))
}
var sBytes [8]byte
testSpanID, _ := hex.DecodeString("7436d6ac76178623")
copy(sBytes[:], testSpanID)
spanID := pcommon.SpanID(sBytes)
promExporterExemplars.SetSpanID(spanID)

exemplarTs, _ := time.Parse("unix", "Mon Jan _2 15:04:05 MST 2006")
promExporterExemplars.SetTimestamp(pcommon.NewTimestampFromTime(exemplarTs))
promExporterExemplars.SetDoubleValue(3.0)

pMap := pcommon.NewMap()

c := collector{

accumulator: &mockAccumulator{
metrics: []pmetric.Metric{metric},
resourceAttributes: pMap,
Expand All @@ -161,29 +147,17 @@ func TestConvertDoubleHistogramExemplar(t *testing.T) {

buckets := m.GetHistogram().GetBucket()

require.Equal(t, 4, len(buckets))
require.Equal(t, 3, len(buckets))

require.Equal(t, 3.0, buckets[0].GetExemplar().GetValue())
require.Equal(t, int32(128654848), buckets[0].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 1, len(buckets[0].GetExemplar().GetLabel()))
require.Equal(t, "test_label_0", buckets[0].GetExemplar().GetLabel()[0].GetName())
require.Equal(t, "label_value_0", buckets[0].GetExemplar().GetLabel()[0].GetValue())

require.Equal(t, 0.0, buckets[1].GetExemplar().GetValue())
require.Equal(t, int32(0), buckets[1].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 0, len(buckets[1].GetExemplar().GetLabel()))

require.Equal(t, 78.0, buckets[2].GetExemplar().GetValue())
require.Equal(t, int32(128654848), buckets[2].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 1, len(buckets[2].GetExemplar().GetLabel()))
require.Equal(t, "test_label_2", buckets[2].GetExemplar().GetLabel()[0].GetName())
require.Equal(t, "label_value_2", buckets[2].GetExemplar().GetLabel()[0].GetValue())

require.Equal(t, 100.0, buckets[3].GetExemplar().GetValue())
require.Equal(t, int32(128654848), buckets[3].GetExemplar().GetTimestamp().GetNanos())
require.Equal(t, 1, len(buckets[3].GetExemplar().GetLabel()))
require.Equal(t, "test_label_3", buckets[3].GetExemplar().GetLabel()[0].GetName())
require.Equal(t, "label_value_3", buckets[3].GetExemplar().GetLabel()[0].GetValue())
require.Equal(t, 2, len(buckets[0].GetExemplar().GetLabel()))
ml := make(map[string]string)
for _, l := range buckets[0].GetExemplar().GetLabel() {
ml[l.GetName()] = l.GetValue()
}
require.Equal(t, "641d68e314a58152cc2581e7663435d1", ml["trace_id"])
require.Equal(t, "7436d6ac76178623", ml["span_id"])
}

// errorCheckCore keeps track of logged errors
Expand Down
11 changes: 7 additions & 4 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const (
spanKindKey = "span.kind" // OpenTelemetry non-standard constant.
statusCodeKey = "status.code" // OpenTelemetry non-standard constant.
metricKeySeparator = string(byte(0))
traceIDKey = "trace_id"

defaultDimensionsCacheSize = 1000
)
Expand All @@ -55,6 +54,7 @@ var (

type exemplarData struct {
traceID pcommon.TraceID
spanID pcommon.SpanID
value float64
}

Expand Down Expand Up @@ -390,7 +390,7 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.S
p.cache(serviceName, span, key, resourceAttr)
p.updateCallMetrics(key)
p.updateLatencyMetrics(key, latencyInMilliseconds, index)
p.updateLatencyExemplars(key, latencyInMilliseconds, span.TraceID())
p.updateLatencyExemplars(key, latencyInMilliseconds, span.TraceID(), span.SpanID())
}

// updateCallMetrics increments the call count for the given metric key.
Expand All @@ -409,13 +409,14 @@ func (p *processorImp) resetAccumulatedMetrics() {
}

// updateLatencyExemplars sets the histogram exemplars for the given metric key and append the exemplar data.
func (p *processorImp) updateLatencyExemplars(key metricKey, value float64, traceID pcommon.TraceID) {
func (p *processorImp) updateLatencyExemplars(key metricKey, value float64, traceID pcommon.TraceID, spanID pcommon.SpanID) {
if _, ok := p.latencyExemplarsData[key]; !ok {
p.latencyExemplarsData[key] = []exemplarData{}
}

e := exemplarData{
traceID: traceID,
spanID: spanID,
value: value,
}
p.latencyExemplarsData[key] = append(p.latencyExemplarsData[key], e)
Expand Down Expand Up @@ -559,6 +560,7 @@ func setLatencyExemplars(exemplarsData []exemplarData, timestamp pcommon.Timesta
for _, ed := range exemplarsData {
value := ed.value
traceID := ed.traceID
spanID := ed.spanID

exemplar := es.AppendEmpty()

Expand All @@ -568,7 +570,8 @@ func setLatencyExemplars(exemplarsData []exemplarData, timestamp pcommon.Timesta

exemplar.SetDoubleValue(value)
exemplar.SetTimestamp(timestamp)
exemplar.FilteredAttributes().PutStr(traceIDKey, traceID.HexString())
exemplar.SetTraceID(traceID)
exemplar.SetSpanID(spanID)
}

es.CopyTo(exemplars)
Expand Down
17 changes: 11 additions & 6 deletions processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ func initSpan(span span, s ptrace.Span) {
now := time.Now()
s.SetStartTimestamp(pcommon.NewTimestampFromTime(now))
s.SetEndTimestamp(pcommon.NewTimestampFromTime(now.Add(sampleLatencyDuration)))

s.Attributes().PutStr(stringAttrName, "stringAttrValue")
s.Attributes().PutInt(intAttrName, 99)
s.Attributes().PutDouble(doubleAttrName, 99.99)
Expand All @@ -630,6 +631,7 @@ func initSpan(span span, s ptrace.Span) {
s.Attributes().PutEmptyMap(mapAttrName)
s.Attributes().PutEmptySlice(arrayAttrName)
s.SetTraceID(pcommon.TraceID([16]byte{byte(42)}))
s.SetSpanID(pcommon.SpanID([8]byte{byte(42)}))
}

func newOTLPExporters(t *testing.T) (*otlpexporter.Config, component.MetricsExporter, component.TracesExporter) {
Expand Down Expand Up @@ -841,21 +843,23 @@ func TestSetLatencyExemplars(t *testing.T) {
// ----- conditions -------------------------------------------------------
traces := buildSampleTrace()
traceID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
spanID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SpanID()
exemplarSlice := pmetric.NewExemplarSlice()
timestamp := pcommon.NewTimestampFromTime(time.Now())
value := float64(42)

ed := []exemplarData{{traceID: traceID, value: value}}
ed := []exemplarData{{traceID: traceID, spanID: spanID, value: value}}

// ----- call -------------------------------------------------------------
setLatencyExemplars(ed, timestamp, exemplarSlice)

// ----- verify -----------------------------------------------------------
traceIDValue, exist := exemplarSlice.At(0).FilteredAttributes().Get(traceIDKey)
traceIDValue := exemplarSlice.At(0).TraceID()
spanIDValue := exemplarSlice.At(0).SpanID()

assert.NotEmpty(t, exemplarSlice)
assert.True(t, exist)
assert.Equal(t, traceIDValue.AsString(), traceID.HexString())
assert.Equal(t, traceIDValue, traceID)
assert.Equal(t, spanIDValue, spanID)
assert.Equal(t, exemplarSlice.At(0).Timestamp(), timestamp)
assert.Equal(t, exemplarSlice.At(0).DoubleValue(), value)
}
Expand All @@ -866,18 +870,19 @@ func TestProcessorUpdateLatencyExemplars(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)
traces := buildSampleTrace()
traceID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
spanID := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SpanID()
key := metricKey("metricKey")
next := new(consumertest.TracesSink)
p, err := newProcessor(zaptest.NewLogger(t), cfg, next)
value := float64(42)

// ----- call -------------------------------------------------------------
p.updateLatencyExemplars(key, value, traceID)
p.updateLatencyExemplars(key, value, traceID, spanID)

// ----- verify -----------------------------------------------------------
assert.NoError(t, err)
assert.NotEmpty(t, p.latencyExemplarsData[key])
assert.Equal(t, p.latencyExemplarsData[key][0], exemplarData{traceID: traceID, value: value})
assert.Equal(t, p.latencyExemplarsData[key][0], exemplarData{traceID: traceID, spanID: spanID, value: value})
}

func TestProcessorResetExemplarData(t *testing.T) {
Expand Down