Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,16 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
continue
}

// Create a new resource metrics to avoid modifying the existing one and avoid race conditions when adding or removing attributes.
rm := pmetric.NewResourceMetrics()
// If the metric name is equal to target_info, we use its labels as attributes of the resource
// Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1
if metadata.Name == "target_info" {
var rm pmetric.ResourceMetrics
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))

if existingRM, ok := prw.rmCache.Get(hashedLabels); ok {
rm = existingRM
// We need to copy the resource metrics to avoid modifying the existing one and avoid race conditions when adding or removing attributes.
existingRM.CopyTo(rm)
} else {
rm = otelMetrics.ResourceMetrics().AppendEmpty()
}
Expand All @@ -281,6 +283,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
attrs.PutStr(labelName, labelValue)
}
}
// After changing the resource metrics, we added the copied version to the cache.
prw.rmCache.Add(hashedLabels, rm)
continue
}
Expand Down Expand Up @@ -309,12 +312,12 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
// Handle regular metrics (gauge, counter, summary)
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
existingRM, ok := prw.rmCache.Get(hashedLabels)
var rm pmetric.ResourceMetrics
if ok {
rm = existingRM
existingRM.CopyTo(rm)
} else {
rm = otelMetrics.ResourceMetrics().AppendEmpty()
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
// After changing the resource metrics, we added the copied version to the cache.
prw.rmCache.Add(hashedLabels, rm)
}

Expand Down Expand Up @@ -411,10 +414,12 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries(
return
}

var rm pmetric.ResourceMetrics
var hashedLabels uint64
var resourceID identity.Resource
var scope pmetric.ScopeMetrics
var (
hashedLabels uint64
resourceID identity.Resource
scope pmetric.ScopeMetrics
rm pmetric.ResourceMetrics
)

for i := range ts.Histograms {
histogram := &ts.Histograms[i]
Expand All @@ -440,10 +445,12 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries(
hashedLabels = xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
existingRM, ok := prw.rmCache.Get(hashedLabels)
if ok {
rm = existingRM
// We need to copy the resource metrics to avoid modifying the existing one and avoid race conditions when adding or removing attributes.
existingRM.CopyTo(rm)
} else {
rm = otelMetrics.ResourceMetrics().AppendEmpty()
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
// After changing the resource metrics, we added the copied version to the cache.
prw.rmCache.Add(hashedLabels, rm)
}
resourceID = identity.OfResource(rm.Resource())
Expand Down
134 changes: 134 additions & 0 deletions receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math"
"net/http"
"net/http/httptest"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1819,3 +1820,136 @@ func buildMetaDataMapByID(ms pmetric.Metrics) map[string]map[string]any {
}
return result
}

// TestConcurrentRequestsforSameResourceAttributes reproduces the concurrency bug where subsequent requests
// fail to reach the next consumer after the first successful request.
func TestConcurrentRequestsforSameResourceAttributes(t *testing.T) {
mockConsumer := &mockConsumer{}
prwReceiver := setupMetricsReceiver(t)
prwReceiver.nextConsumer = mockConsumer

// Create a test HTTP server
ts := httptest.NewServer(http.HandlerFunc(prwReceiver.handlePRW))
defer ts.Close()

// Create multiple requests with the same job/instance labels (triggering cache key collision)
createRequest := func(metricName string, value float64, timestamp int64) *writev2.Request {
return &writev2.Request{
Symbols: []string{
"", // 0
"__name__", metricName, // 1, 2
"job", "test_job", // 3, 4
"instance", "test_instance", // 5, 6
},
Timeseries: []writev2.TimeSeries{
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6},
Samples: []writev2.Sample{{Value: value, Timestamp: timestamp}},
},
},
}
}

// Prepare requests
requests := []*writev2.Request{}
for i := 0; i < 5; i++ {
requests = append(requests, createRequest("metric_"+strconv.Itoa(i+1), float64(i+1)*10, int64(i+1)*1000))
}

// Send requests concurrently
var wg sync.WaitGroup
var httpResults []int // Store HTTP status codes
var mu sync.Mutex

for i, req := range requests {
wg.Add(1)
go func(_ int, request *writev2.Request) {
defer wg.Done()

pBuf := proto.NewBuffer(nil)
err := pBuf.Marshal(request)
assert.NoError(t, err)

resp, err := http.Post(
ts.URL,
fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2),
bytes.NewBuffer(pBuf.Bytes()),
)
assert.NoError(t, err)
defer resp.Body.Close()

mu.Lock()
httpResults = append(httpResults, resp.StatusCode)
mu.Unlock()
}(i, req)
}
wg.Wait()

// Give some time for async processing
time.Sleep(100 * time.Millisecond)

// Analyze results
mockConsumer.mu.Lock()
receivedMetrics := len(mockConsumer.metrics)
totalDataPoints := mockConsumer.dataPoints
mockConsumer.mu.Unlock()

// Verify all HTTP requests succeeded
for i, status := range httpResults {
assert.Equal(t, http.StatusNoContent, status, "Request %d should return 204", i+1)
}

// The expected behavior is:
// - All HTTP requests return 204 (success)
// - All 5 data points are present (no data loss)
// - We have 5 resource attributes, each with 1 data point. The resource attributes are equal since they have the same job/instance labels.
// - The cache should have a single resource attribute.
assert.Equal(t, 5, totalDataPoints)
assert.Equal(t, 1, prwReceiver.rmCache.Len())

// Additional debugging info
t.Logf("Metrics batches received: %d", receivedMetrics)
for i, metrics := range mockConsumer.metrics {
t.Logf("Batch %d: %d resource metrics, %d data points",
i+1, metrics.ResourceMetrics().Len(), metrics.DataPointCount())
}

// Additional analysis: Check if we're getting mixed data due to concurrent mutations

t.Logf("=== DETAILED ANALYSIS ===")
for i, metrics := range mockConsumer.metrics {
resourceMetrics := metrics.ResourceMetrics()
t.Logf("Batch %d:", i+1)
for j := 0; j < resourceMetrics.Len(); j++ {
rm := resourceMetrics.At(j)
scopeMetrics := rm.ScopeMetrics()
for k := 0; k < scopeMetrics.Len(); k++ {
scope := scopeMetrics.At(k)
metricsList := scope.Metrics()
for l := 0; l < metricsList.Len(); l++ {
metric := metricsList.At(l)
t.Logf(" - Metric: %s, DataPoints: %d", metric.Name(), metric.Gauge().DataPoints().Len())
}
}
}
}

// Verify thread safety: Check that metrics are properly consolidated without corruption
for i, metrics := range mockConsumer.metrics {
if metrics.DataPointCount() > 0 {
resourceMetrics := metrics.ResourceMetrics()
for j := 0; j < resourceMetrics.Len(); j++ {
rm := resourceMetrics.At(j)
scopeMetrics := rm.ScopeMetrics()
for k := 0; k < scopeMetrics.Len(); k++ {
scope := scopeMetrics.At(k)
metricsCount := scope.Metrics().Len()
if metricsCount != 1 {
t.Errorf("Batch %d: Found %d datapoints when it should be 1", i+1, metricsCount)
}
}
}
}
}
}
Loading