Skip to content

Commit 4f10361

Browse files
[receiver/azuremonitorreceiver] parallelize calls by subscriptions in Batch API mode (#43047)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR revives the previously non-functional #40344, aiming to enhance performance by running metrics collection across Azure subscriptions in parallel. This significantly reduces latency when dealing with multiple subscriptions. Compared to the original PR, this version addresses a critical concurrency issue: ```fatal error: concurrent map read and map write``` Reference: https://victoriametrics.com/blog/go-sync-map/ To resolve this, I evaluated multiple approaches and found that using the concurrent-map library yielded the best results. Details of the evaluation and benchmarks are documented in concurrency_bench_report.md. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #39417 <!--Describe what testing was performed and which tests were added.--> #### Testing During testing, a race condition was discovered in the Azure SDK related to the fallback behavior of the Cloud option in the NewClient function: ```go // NewClient creates a client that accesses Azure Monitor metrics data. // Client should be used for performing metrics queries on multiple monitored resources in the same region. // A credential with authorization at the subscription level is required when using this client. // // endpoint - The regional endpoint to use, for example https://eastus.metrics.monitor.azure.com. // The region should match the region of the requested resources. For global resources, the region should be 'global'. func NewClient(endpoint string, credential azcore.TokenCredential, options *ClientOptions) (*Client, error) { if options == nil { options = &ClientOptions{} } if reflect.ValueOf(options.Cloud).IsZero() { options.Cloud = cloud.AzurePublic // <-- HERE } c, ok := options.Cloud.Services[ServiceName] if !ok || c.Audience == "" { return nil, errors.New("provided Cloud field is missing Azure Monitor Metrics configuration") } ``` To prevent this, our implementation explicitly sets the Cloud option in all cases, ensuring deterministic behavior and avoiding the race. <!--Describe the documentation added.--> #### Documentation A new markdown file (concurrency_bench_report.md) has been added to document: - The rationale behind choosing concurrent-map - Benchmark results comparing different implementations - Notes for future contributors who may want to explore alternative concurrency strategies <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Célian Garcia <[email protected]> Signed-off-by: Celian GARCIA <[email protected]> Co-authored-by: Moritz Wiesinger <[email protected]>
1 parent 4cf90fa commit 4f10361

File tree

10 files changed

+353
-63
lines changed

10 files changed

+353
-63
lines changed

.chloggen/celian-garcia_39417.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/azuremonitor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: parallelize calls by subscriptions in Batch API mode
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39417]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package azuremonitorreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver"
5+
6+
import (
7+
"sync"
8+
9+
cmap "github.com/orcaman/concurrent-map/v2"
10+
)
11+
12+
type concurrentMetricsBuilderMap[V any] interface {
13+
Get(string) (V, bool)
14+
Set(string, V)
15+
Clear()
16+
Range(func(string, V))
17+
}
18+
19+
// Implementation with concurrent-map (generic API)
20+
type concurrentMapImpl[V any] struct {
21+
m cmap.ConcurrentMap[string, V]
22+
}
23+
24+
func newConcurrentMapImpl[V any]() concurrentMetricsBuilderMap[V] {
25+
return &concurrentMapImpl[V]{m: cmap.New[V]()}
26+
}
27+
28+
func (c *concurrentMapImpl[V]) Get(key string) (V, bool) {
29+
return c.m.Get(key)
30+
}
31+
32+
func (c *concurrentMapImpl[V]) Set(key string, value V) {
33+
c.m.Set(key, value)
34+
}
35+
36+
func (c *concurrentMapImpl[V]) Clear() {
37+
c.m.Clear()
38+
}
39+
40+
func (c *concurrentMapImpl[V]) Range(f func(string, V)) {
41+
c.m.IterCb(f)
42+
}
43+
44+
// Implementation with sync.Map
45+
46+
type syncMapImpl[V any] struct {
47+
m sync.Map
48+
}
49+
50+
func NewSyncMapImpl[V any]() concurrentMetricsBuilderMap[V] {
51+
return &syncMapImpl[V]{}
52+
}
53+
54+
func (s *syncMapImpl[V]) Get(key string) (V, bool) {
55+
v, ok := s.m.Load(key)
56+
if !ok {
57+
var zero V
58+
return zero, false
59+
}
60+
return v.(V), true
61+
}
62+
63+
func (s *syncMapImpl[V]) Set(key string, value V) {
64+
s.m.Store(key, value)
65+
}
66+
67+
func (s *syncMapImpl[V]) Clear() {
68+
s.m.Range(func(k, _ any) bool {
69+
s.m.Delete(k)
70+
return true
71+
})
72+
}
73+
74+
func (s *syncMapImpl[V]) Range(f func(string, V)) {
75+
s.m.Range(func(k, v any) bool {
76+
f(k.(string), v.(V))
77+
return true
78+
})
79+
}
80+
81+
// Implementation with classic map and mutex
82+
83+
type mutexMapImpl[V any] struct {
84+
m map[string]V
85+
mutex sync.RWMutex
86+
}
87+
88+
func NewMutexMapImpl[V any]() concurrentMetricsBuilderMap[V] {
89+
return &mutexMapImpl[V]{m: make(map[string]V)}
90+
}
91+
92+
func (mm *mutexMapImpl[V]) Get(key string) (V, bool) {
93+
mm.mutex.RLock()
94+
defer mm.mutex.RUnlock()
95+
v, ok := mm.m[key]
96+
return v, ok
97+
}
98+
99+
func (mm *mutexMapImpl[V]) Set(key string, value V) {
100+
mm.mutex.Lock()
101+
defer mm.mutex.Unlock()
102+
mm.m[key] = value
103+
}
104+
105+
func (mm *mutexMapImpl[V]) Clear() {
106+
mm.mutex.Lock()
107+
defer mm.mutex.Unlock()
108+
mm.m = make(map[string]V)
109+
}
110+
111+
func (mm *mutexMapImpl[V]) Range(f func(string, V)) {
112+
mm.mutex.RLock()
113+
defer mm.mutex.RUnlock()
114+
for k, v := range mm.m {
115+
f(k, v)
116+
}
117+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Concurrency Map Benchmark Report
2+
3+
## Context
4+
This benchmark compares three concurrent map implementations in Go:
5+
- **concurrentMapImpl**: Based on github.com/orcaman/concurrent-map (generic API)
6+
- **syncMapImpl**: Based on Go's built-in sync.Map
7+
- **mutexMapImpl**: Classic map protected by sync.RWMutex
8+
9+
Benchmarks were run with both small and large datasets (1 million pre-filled entries), using parallel Set/Get operations and multiple CPU counts (1, 2, 4, 8).
10+
11+
## Results Summary
12+
13+
### Small Dataset (Random keys)
14+
- **concurrentMapImpl**: Fastest, minimal memory usage, scales well with CPU count.
15+
- **syncMapImpl**: Slowest, highest memory allocation, scales with CPU but remains less efficient.
16+
- **mutexMapImpl**: Intermediate performance, low memory usage, slightly less scalable with more CPUs.
17+
18+
### Large Dataset (1 million entries)
19+
- **concurrentMapImpl**: Remains fastest, especially with 8 CPUs. Memory usage stays low (32–54 B/op, 1 alloc/op).
20+
- **syncMapImpl**: Still slowest, with high memory allocation (107–110 B/op, 4 allocs/op).
21+
- **mutexMapImpl**: Good for moderate concurrency, memory usage low, but performance drops with more CPUs.
22+
23+
## Recommendations
24+
- For high concurrency and large datasets, **concurrentMapImpl** is the best choice.
25+
- For simple or low-concurrency use cases, **mutexMapImpl** is efficient and easy to maintain.
26+
- **syncMapImpl** is not recommended for performance-critical scenarios due to its overhead.
27+
28+
## Example Benchmark Output
29+
```
30+
BenchmarkConcurrentMapImplLarge-8 341.9 ns/op 32 B/op 1 allocs/op
31+
BenchmarkSyncMapImplLarge-8 342.1 ns/op 107 B/op 4 allocs/op
32+
BenchmarkMutexMapImplLarge-8 748.2 ns/op 31 B/op 1 allocs/op
33+
```
34+
35+
## Conclusion
36+
The generic concurrent-map implementation offers the best performance and scalability for concurrent workloads in Go. The classic mutex-protected map is a good fallback for simpler cases. Avoid sync.Map for intensive workloads.
37+
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package azuremonitorreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/azuremonitorreceiver"
5+
6+
import (
7+
"math/rand/v2"
8+
"strconv"
9+
"testing"
10+
)
11+
12+
func benchmarkMapImpl(b *testing.B, m concurrentMetricsBuilderMap[int]) {
13+
b.RunParallel(func(pb *testing.PB) {
14+
for pb.Next() {
15+
key := strconv.Itoa(rand.IntN(100000))
16+
m.Set(key, rand.Int())
17+
_, _ = m.Get(key)
18+
}
19+
})
20+
}
21+
22+
func BenchmarkConcurrentMapImpl(b *testing.B) {
23+
m := newConcurrentMapImpl[int]()
24+
benchmarkMapImpl(b, m)
25+
}
26+
27+
func BenchmarkSyncMapImpl(b *testing.B) {
28+
m := NewSyncMapImpl[int]()
29+
benchmarkMapImpl(b, m)
30+
}
31+
32+
func BenchmarkMutexMapImpl(b *testing.B) {
33+
m := NewMutexMapImpl[int]()
34+
benchmarkMapImpl(b, m)
35+
}
36+
37+
func benchmarkMapImplLarge(b *testing.B, m concurrentMetricsBuilderMap[int]) {
38+
// Pre-fill with 1 million entries
39+
for i := 0; i < 1_000_000; i++ {
40+
key := strconv.Itoa(i)
41+
m.Set(key, i)
42+
}
43+
b.ResetTimer()
44+
b.RunParallel(func(pb *testing.PB) {
45+
for pb.Next() {
46+
// Randomly access existing and new keys
47+
if rand.IntN(2) == 0 {
48+
key := strconv.Itoa(rand.IntN(1_000_000)) // existing
49+
m.Set(key, rand.Int())
50+
_, _ = m.Get(key)
51+
} else {
52+
key := strconv.Itoa(rand.IntN(10_000_000)) // possibly new
53+
m.Set(key, rand.Int())
54+
_, _ = m.Get(key)
55+
}
56+
}
57+
})
58+
}
59+
60+
func BenchmarkConcurrentMapImplLarge(b *testing.B) {
61+
m := newConcurrentMapImpl[int]()
62+
benchmarkMapImplLarge(b, m)
63+
}
64+
65+
func BenchmarkSyncMapImplLarge(b *testing.B) {
66+
m := NewSyncMapImpl[int]()
67+
benchmarkMapImplLarge(b, m)
68+
}
69+
70+
func BenchmarkMutexMapImplLarge(b *testing.B) {
71+
m := NewMutexMapImpl[int]()
72+
benchmarkMapImplLarge(b, m)
73+
}

receiver/azuremonitorreceiver/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ require (
1111
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0
1212
github.com/google/go-cmp v0.7.0
1313
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.137.0
14-
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.137.0
14+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.136.0
15+
github.com/orcaman/concurrent-map/v2 v2.0.1
1516
github.com/stretchr/testify v1.11.1
1617
go.opentelemetry.io/collector/component v1.43.0
1718
go.opentelemetry.io/collector/component/componenttest v0.137.0

receiver/azuremonitorreceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/azuremonitorreceiver/options.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ type ClientOptionsResolver interface {
1818
}
1919

2020
type clientOptionsResolver struct {
21-
armOptions *arm.ClientOptions
22-
azmetricsOptions *azmetrics.ClientOptions
21+
cloud cloud.Configuration
2322
}
2423

2524
// newClientOptionsResolver creates a resolver that will always return the same options.
2625
// Unlike in the tests where there will be one option by API mock, here we don't need different options for each client.
26+
// Note the fact that it recreates the options each time. It's because the options are mutable, they can be modified by the client ctor.
2727
func newClientOptionsResolver(cloudStr string) ClientOptionsResolver {
2828
var cloudToUse cloud.Configuration
2929
switch cloudStr {
@@ -34,25 +34,35 @@ func newClientOptionsResolver(cloudStr string) ClientOptionsResolver {
3434
default:
3535
cloudToUse = cloud.AzurePublic
3636
}
37-
return &clientOptionsResolver{armOptions: &arm.ClientOptions{
38-
ClientOptions: azcore.ClientOptions{
39-
Cloud: cloudToUse,
40-
},
41-
}}
37+
return &clientOptionsResolver{cloud: cloudToUse}
38+
}
39+
40+
func (r *clientOptionsResolver) getClientOptions() azcore.ClientOptions {
41+
return azcore.ClientOptions{
42+
Cloud: r.cloud,
43+
}
4244
}
4345

4446
func (r *clientOptionsResolver) GetArmResourceClientOptions(_ string) *arm.ClientOptions {
45-
return r.armOptions
47+
return &arm.ClientOptions{
48+
ClientOptions: r.getClientOptions(),
49+
}
4650
}
4751

4852
func (r *clientOptionsResolver) GetArmSubscriptionsClientOptions() *arm.ClientOptions {
49-
return r.armOptions
53+
return &arm.ClientOptions{
54+
ClientOptions: r.getClientOptions(),
55+
}
5056
}
5157

5258
func (r *clientOptionsResolver) GetArmMonitorClientOptions() *arm.ClientOptions {
53-
return r.armOptions
59+
return &arm.ClientOptions{
60+
ClientOptions: r.getClientOptions(),
61+
}
5462
}
5563

5664
func (r *clientOptionsResolver) GetAzMetricsClientOptions() *azmetrics.ClientOptions {
57-
return r.azmetricsOptions
65+
return &azmetrics.ClientOptions{
66+
ClientOptions: r.getClientOptions(),
67+
}
5868
}

receiver/azuremonitorreceiver/options_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func newMockClientOptionsResolver(
111111
}
112112
armResourcesClientOptions[subID] = &arm.ClientOptions{
113113
ClientOptions: azcore.ClientOptions{
114+
Cloud: cloud.AzurePublic, // Ensure Cloud client options is set. This is important to prevent race condition in the client constructor.
114115
Transport: armresourcesfake.NewServerTransport(&resourceServer),
115116
},
116117
}
@@ -123,6 +124,7 @@ func newMockClientOptionsResolver(
123124
}
124125
armSubscriptionsClientOptions := &arm.ClientOptions{
125126
ClientOptions: azcore.ClientOptions{
127+
Cloud: cloud.AzurePublic, // Ensure Cloud client options is set. This is important to prevent race condition in the client constructor.
126128
Transport: armsubscriptionsfake.NewServerTransport(&subscriptionsServer),
127129
},
128130
}
@@ -138,6 +140,7 @@ func newMockClientOptionsResolver(
138140
}
139141
armMonitorClientOptions := &arm.ClientOptions{
140142
ClientOptions: azcore.ClientOptions{
143+
Cloud: cloud.AzurePublic, // Ensure Cloud client options is set. This is important to prevent race condition in the client constructor.
141144
Transport: armmonitorfake.NewServerFactoryTransport(&armMonitorServerFactory),
142145
},
143146
}
@@ -148,6 +151,7 @@ func newMockClientOptionsResolver(
148151
}
149152
azMetricsClientOptions := &azmetrics.ClientOptions{
150153
ClientOptions: azcore.ClientOptions{
154+
Cloud: cloud.AzurePublic, // Ensure Cloud client options is set. This is important to prevent race condition in the client constructor.
151155
Transport: azmetricsfake.NewServerTransport(&azMetricsServer),
152156
},
153157
}

0 commit comments

Comments
 (0)