Skip to content

Commit 97659b5

Browse files
h0cheungChrsMark
andauthored
feat (processor/k8sattributes): wait for synced when starting k8sattributes processor. (#32622)
**Description:** <Describe what has changed.> <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> When starting `k8sattributes` processor, block until an initial synchronization has been completed. This solves #32556 **Link to tracking Issue:** <Issue number if applicable> fix #32556 **Testing:** <Describe what testing was performed and which tests were added.> Tested in a cluster with constant high span traffic, no more spans with unassociated k8s metadata after adding this pr. **Documentation:** <Describe the documentation added.> --------- Co-authored-by: Christos Markou <[email protected]>
1 parent c7ecf2c commit 97659b5

File tree

15 files changed

+238
-109
lines changed

15 files changed

+238
-109
lines changed

.chloggen/k8sattributes-block.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: processor/k8sattributes
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Block when starting until the metadata have been synced, to fix that some data couldn't be associated with metadata when the agent was just started.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [32556]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

processor/k8sattributesprocessor/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,21 @@ the processor associates the received trace to the pod, based on the connection
198198
}
199199
```
200200

201+
By default, the processor will be ready as soon as it starts, even if no metadata has been fetched yet.
202+
If data is sent to this processor before the metadata is synced, there will be no metadata to enrich the data with.
203+
204+
To wait for the metadata to be synced before the processor is ready, set the `wait_for_metadata` option to `true`.
205+
Then the processor will not be ready until the metadata is fully synced. As a result, the start-up of the Collector will be blocked. If the metadata cannot be synced, the Collector will ultimately fail to start.
206+
If a timeout is reached, the processor will fail to start and return an error, which will cause the collector to exit.
207+
The timeout defaults to 10s and can be configured with the `metadata_sync_timeout` option.
208+
209+
example for setting the processor to wait for metadata to be synced before it is ready:
210+
211+
```yaml
212+
wait_for_metadata: true
213+
wait_for_metadata_timeout: 10s
214+
```
215+
201216
## Extracting attributes from pod labels and annotations
202217

203218
The k8sattributesprocessor can also set resource attributes from k8s labels and annotations of pods, namespaces and nodes.

processor/k8sattributesprocessor/client_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package k8sattributesprocessor
55

66
import (
7+
"time"
8+
79
"go.opentelemetry.io/collector/component"
810
"k8s.io/apimachinery/pkg/fields"
911
"k8s.io/apimachinery/pkg/labels"
@@ -35,7 +37,7 @@ func selectors() (labels.Selector, fields.Selector) {
3537
}
3638

3739
// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
38-
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet) (kube.Client, error) {
40+
func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) {
3941
cs := fake.NewSimpleClientset()
4042

4143
ls, fs := selectors()
@@ -70,10 +72,11 @@ func (f *fakeClient) GetNode(nodeName string) (*kube.Node, bool) {
7072
}
7173

7274
// Start is a noop for FakeClient.
73-
func (f *fakeClient) Start() {
75+
func (f *fakeClient) Start() error {
7476
if f.Informer != nil {
75-
f.Informer.Run(f.StopCh)
77+
go f.Informer.Run(f.StopCh)
7678
}
79+
return nil
7780
}
7881

7982
// Stop is a noop for FakeClient.

processor/k8sattributesprocessor/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr
66
import (
77
"fmt"
88
"regexp"
9+
"time"
910

1011
"go.opentelemetry.io/collector/featuregate"
1112
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
@@ -46,6 +47,12 @@ type Config struct {
4647
// Exclude section allows to define names of pod that should be
4748
// ignored while tagging.
4849
Exclude ExcludeConfig `mapstructure:"exclude"`
50+
51+
// WaitForMetadata is a flag that determines if the processor should wait k8s metadata to be synced when starting.
52+
WaitForMetadata bool `mapstructure:"wait_for_metadata"`
53+
54+
// WaitForMetadataTimeout is the maximum time the processor will wait for the k8s metadata to be synced.
55+
WaitForMetadataTimeout time.Duration `mapstructure:"wait_for_metadata_timeout"`
4956
}
5057

5158
func (cfg *Config) Validate() error {

processor/k8sattributesprocessor/config_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package k8sattributesprocessor
66
import (
77
"path/filepath"
88
"testing"
9+
"time"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
@@ -34,6 +35,7 @@ func TestLoadConfig(t *testing.T) {
3435
Extract: ExtractConfig{
3536
Metadata: enabledAttributes(),
3637
},
38+
WaitForMetadataTimeout: 10 * time.Second,
3739
},
3840
},
3941
{
@@ -105,6 +107,7 @@ func TestLoadConfig(t *testing.T) {
105107
{Name: "jaeger-collector"},
106108
},
107109
},
110+
WaitForMetadataTimeout: 10 * time.Second,
108111
},
109112
},
110113
{
@@ -127,6 +130,7 @@ func TestLoadConfig(t *testing.T) {
127130
{Name: "jaeger-collector"},
128131
},
129132
},
133+
WaitForMetadataTimeout: 10 * time.Second,
130134
},
131135
},
132136
{
@@ -149,6 +153,7 @@ func TestLoadConfig(t *testing.T) {
149153
{Name: "jaeger-collector"},
150154
},
151155
},
156+
WaitForMetadataTimeout: 10 * time.Second,
152157
},
153158
},
154159
{

processor/k8sattributesprocessor/factory.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetr
55

66
import (
77
"context"
8+
"time"
89

910
"go.opentelemetry.io/collector/component"
1011
"go.opentelemetry.io/collector/consumer"
@@ -44,6 +45,7 @@ func createDefaultConfig() component.Config {
4445
Extract: ExtractConfig{
4546
Metadata: enabledAttributes(),
4647
},
48+
WaitForMetadataTimeout: 10 * time.Second,
4749
}
4850
}
4951

@@ -202,5 +204,10 @@ func createProcessorOpts(cfg component.Config) []option {
202204

203205
opts = append(opts, withExcludes(oCfg.Exclude))
204206

207+
opts = append(opts, withWaitForMetadataTimeout(oCfg.WaitForMetadataTimeout))
208+
if oCfg.WaitForMetadata {
209+
opts = append(opts, withWaitForMetadata(true))
210+
}
211+
205212
return opts
206213
}

processor/k8sattributesprocessor/generated_component_test.go

Lines changed: 0 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/k8sattributesprocessor/internal/kube/client.go

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910
"regexp"
1011
"strings"
@@ -39,18 +40,20 @@ var enableRFC3339Timestamp = featuregate.GlobalRegistry().MustRegister(
3940

4041
// WatchClient is the main interface provided by this package to a kubernetes cluster.
4142
type WatchClient struct {
42-
m sync.RWMutex
43-
deleteMut sync.Mutex
44-
logger *zap.Logger
45-
kc kubernetes.Interface
46-
informer cache.SharedInformer
47-
namespaceInformer cache.SharedInformer
48-
nodeInformer cache.SharedInformer
49-
replicasetInformer cache.SharedInformer
50-
replicasetRegex *regexp.Regexp
51-
cronJobRegex *regexp.Regexp
52-
deleteQueue []deleteRequest
53-
stopCh chan struct{}
43+
m sync.RWMutex
44+
deleteMut sync.Mutex
45+
logger *zap.Logger
46+
kc kubernetes.Interface
47+
informer cache.SharedInformer
48+
namespaceInformer cache.SharedInformer
49+
nodeInformer cache.SharedInformer
50+
replicasetInformer cache.SharedInformer
51+
replicasetRegex *regexp.Regexp
52+
cronJobRegex *regexp.Regexp
53+
deleteQueue []deleteRequest
54+
stopCh chan struct{}
55+
waitForMetadata bool
56+
waitForMetadataTimeout time.Duration
5457

5558
// A map containing Pod related data, used to associate them with resources.
5659
// Key can be either an IP address or Pod UID
@@ -84,21 +87,36 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
8487
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)
8588

8689
// New initializes a new k8s Client.
87-
func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newNamespaceInformer InformerProviderNamespace, newReplicaSetInformer InformerProviderReplicaSet) (Client, error) {
90+
func New(
91+
set component.TelemetrySettings,
92+
apiCfg k8sconfig.APIConfig,
93+
rules ExtractionRules,
94+
filters Filters,
95+
associations []Association,
96+
exclude Excludes,
97+
newClientSet APIClientsetProvider,
98+
newInformer InformerProvider,
99+
newNamespaceInformer InformerProviderNamespace,
100+
newReplicaSetInformer InformerProviderReplicaSet,
101+
waitForMetadata bool,
102+
waitForMetadataTimeout time.Duration,
103+
) (Client, error) {
88104
telemetryBuilder, err := metadata.NewTelemetryBuilder(set)
89105
if err != nil {
90106
return nil, err
91107
}
92108
c := &WatchClient{
93-
logger: set.Logger,
94-
Rules: rules,
95-
Filters: filters,
96-
Associations: associations,
97-
Exclude: exclude,
98-
replicasetRegex: rRegex,
99-
cronJobRegex: cronJobRegex,
100-
stopCh: make(chan struct{}),
101-
telemetryBuilder: telemetryBuilder,
109+
logger: set.Logger,
110+
Rules: rules,
111+
Filters: filters,
112+
Associations: associations,
113+
Exclude: exclude,
114+
replicasetRegex: rRegex,
115+
cronJobRegex: cronJobRegex,
116+
stopCh: make(chan struct{}),
117+
telemetryBuilder: telemetryBuilder,
118+
waitForMetadata: waitForMetadata,
119+
waitForMetadataTimeout: waitForMetadataTimeout,
102120
}
103121
go c.deleteLoop(time.Second*30, defaultPodDeleteGracePeriod)
104122

@@ -189,50 +207,67 @@ func New(set component.TelemetrySettings, apiCfg k8sconfig.APIConfig, rules Extr
189207
}
190208

191209
// Start registers pod event handlers and starts watching the kubernetes cluster for pod changes.
192-
func (c *WatchClient) Start() {
193-
_, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
210+
func (c *WatchClient) Start() error {
211+
synced := make([]cache.InformerSynced, 0)
212+
reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
194213
AddFunc: c.handlePodAdd,
195214
UpdateFunc: c.handlePodUpdate,
196215
DeleteFunc: c.handlePodDelete,
197216
})
198217
if err != nil {
199-
c.logger.Error("error adding event handler to pod informer", zap.Error(err))
218+
return err
200219
}
220+
synced = append(synced, reg.HasSynced)
201221
go c.informer.Run(c.stopCh)
202222

203-
_, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
223+
reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
204224
AddFunc: c.handleNamespaceAdd,
205225
UpdateFunc: c.handleNamespaceUpdate,
206226
DeleteFunc: c.handleNamespaceDelete,
207227
})
208228
if err != nil {
209-
c.logger.Error("error adding event handler to namespace informer", zap.Error(err))
229+
return err
210230
}
231+
synced = append(synced, reg.HasSynced)
211232
go c.namespaceInformer.Run(c.stopCh)
212233

213234
if c.Rules.DeploymentName || c.Rules.DeploymentUID {
214-
_, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
235+
reg, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
215236
AddFunc: c.handleReplicaSetAdd,
216237
UpdateFunc: c.handleReplicaSetUpdate,
217238
DeleteFunc: c.handleReplicaSetDelete,
218239
})
219240
if err != nil {
220-
c.logger.Error("error adding event handler to replicaset informer", zap.Error(err))
241+
return err
221242
}
243+
synced = append(synced, reg.HasSynced)
222244
go c.replicasetInformer.Run(c.stopCh)
223245
}
224246

225247
if c.nodeInformer != nil {
226-
_, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
248+
reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
227249
AddFunc: c.handleNodeAdd,
228250
UpdateFunc: c.handleNodeUpdate,
229251
DeleteFunc: c.handleNodeDelete,
230252
})
231253
if err != nil {
232-
c.logger.Error("error adding event handler to node informer", zap.Error(err))
254+
return err
233255
}
256+
synced = append(synced, reg.HasSynced)
234257
go c.nodeInformer.Run(c.stopCh)
235258
}
259+
260+
if c.waitForMetadata {
261+
timeoutCh := make(chan struct{})
262+
t := time.AfterFunc(c.waitForMetadataTimeout, func() {
263+
close(timeoutCh)
264+
})
265+
defer t.Stop()
266+
if !cache.WaitForCacheSync(timeoutCh, synced...) {
267+
return errors.New("failed to wait for caches to sync")
268+
}
269+
}
270+
return nil
236271
}
237272

238273
// Stop signals the the k8s watcher/informer to stop watching for new events.

0 commit comments

Comments
 (0)