Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion exporter/loadbalancingexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/ebitengine/purego v0.8.4 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
github.com/foxboron/go-tpm-keyfiles v0.0.0-20250323135004-b31fac66206e // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions exporter/loadbalancingexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 19 additions & 13 deletions exporter/loadbalancingexporter/resolver_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -101,18 +101,23 @@ func newK8sResolver(clt kubernetes.Interface,
}
}

epsSelector := fmt.Sprintf("metadata.name=%s", name)
labelSelector := discoveryv1.LabelServiceName + "=" + name
timeoutSeconds := ptr.To[int64](int64(timeout.Seconds()))
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = labelSelector
options.TimeoutSeconds = timeoutSeconds
return clt.DiscoveryV1().EndpointSlices(namespace).List(context.Background(), options)
}

watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = labelSelector
options.TimeoutSeconds = timeoutSeconds
return clt.DiscoveryV1().EndpointSlices(namespace).Watch(context.Background(), options)
}

epsListWatcher := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = epsSelector
options.TimeoutSeconds = ptr.To[int64](int64(timeout.Seconds()))
return clt.CoreV1().Endpoints(namespace).List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = epsSelector
options.TimeoutSeconds = ptr.To[int64](int64(timeout.Seconds()))
return clt.CoreV1().Endpoints(namespace).Watch(context.Background(), options)
},
ListFunc: listFunc,
WatchFunc: watchFunc,
}

epsStore := &sync.Map{}
Expand All @@ -121,6 +126,7 @@ func newK8sResolver(clt kubernetes.Interface,
logger: logger,
telemetry: tb,
returnNames: returnNames,
ports: ports,
}
r := &k8sResolver{
logger: logger,
Expand All @@ -146,7 +152,7 @@ func (r *k8sResolver) start(_ context.Context) error {
r.once.Do(func() {
if r.epsListWatcher != nil {
r.logger.Debug("creating and starting endpoints informer")
epsInformer := cache.NewSharedInformer(r.epsListWatcher, &corev1.Endpoints{}, 0)
epsInformer := cache.NewSharedInformer(r.epsListWatcher, &discoveryv1.EndpointSlice{}, 0)
if _, err := epsInformer.AddEventHandler(r.handler); err != nil {
r.logger.Error("unable to start watching for changes to the specified service names", zap.Error(err))
}
Expand Down
34 changes: 33 additions & 1 deletion exporter/loadbalancingexporter/resolver_k8s_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry

import (
"context"
"slices"
"sync"

"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata"
Expand All @@ -27,21 +29,28 @@ type handler struct {
logger *zap.Logger
telemetry *metadata.TelemetryBuilder
returnNames bool
ports []int32
}

func (h handler) OnAdd(obj any, _ bool) {
var endpoints map[string]bool
var ok bool

switch object := obj.(type) {
case *discoveryv1.EndpointSlice:
ok, endpoints = convertEndpointSlices(object, h.ports)
if !ok {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}
case *corev1.Endpoints:
ok, endpoints = convertToEndpoints(h.returnNames, object)
if !ok {
h.logger.Warn(epMissingHostnamesMsg, zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
return
}

default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj))
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
Expand All @@ -60,6 +69,8 @@ func (h handler) OnAdd(obj any, _ bool) {

func (h handler) OnUpdate(oldObj, newObj any) {
switch oldEps := oldObj.(type) {
case *discoveryv1.EndpointSlice:
// TODO
case *corev1.Endpoints:
newEps, ok := newObj.(*corev1.Endpoints)
if !ok {
Expand Down Expand Up @@ -114,6 +125,8 @@ func (h handler) OnDelete(obj any) {
case *cache.DeletedFinalStateUnknown:
h.OnDelete(object.Obj)
return
case *discoveryv1.EndpointSlice:
// TODO
case *corev1.Endpoints:
if object != nil {
ok, endpoints = convertToEndpoints(h.returnNames, object)
Expand Down Expand Up @@ -154,3 +167,22 @@ func convertToEndpoints(retNames bool, eps ...*corev1.Endpoints) (bool, map[stri
}
return true, res
}

func convertEndpointSlices(slice *discoveryv1.EndpointSlice, ports []int32) (bool, map[string]bool) {
res := map[string]bool{}

for _, port := range slice.Ports {
if slices.Contains(ports, *port.Port) {
for _, ep := range slice.Endpoints {
for _, addr := range ep.Addresses {
// TODO: support FQDN and IPv6?
if slice.AddressType == discoveryv1.AddressTypeIPv4 {
res[addr] = true
}
}
}
}
}

return true, res
}
Loading
Loading