Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d837282
automatic resource attributes
zeitlinger Apr 11, 2025
677d462
rename to service rules
zeitlinger Apr 11, 2025
105cfdf
remove exclude
zeitlinger Apr 11, 2025
f7991de
remove exclude
zeitlinger Apr 11, 2025
6345139
cleanup
zeitlinger Apr 11, 2025
912bf7c
cleanup
zeitlinger Apr 11, 2025
2300f1b
cleanup
zeitlinger Apr 11, 2025
474f622
fix test
zeitlinger Apr 11, 2025
0f425c4
lint
zeitlinger Apr 11, 2025
08858ca
readme, changelog
zeitlinger Apr 11, 2025
bdf6acf
readme, changelog
zeitlinger Apr 11, 2025
f0f7d3c
added note
zeitlinger Apr 17, 2025
9500f72
pr review
zeitlinger Apr 22, 2025
f1d0618
fix
zeitlinger May 13, 2025
c2b6488
fix
zeitlinger May 13, 2025
4b25a30
put service attribute config in extract.metadata
zeitlinger May 14, 2025
4570828
put service attribute config in extract.metadata
zeitlinger May 15, 2025
5c1f903
put service attribute config in extract.metadata
zeitlinger May 15, 2025
c5b62a0
put service attribute config in extract.metadata
zeitlinger May 15, 2025
febd8f7
add docs
zeitlinger May 15, 2025
7e93ba9
generate
zeitlinger May 15, 2025
ee3b3bc
generate
zeitlinger May 15, 2025
4bc8fa9
gci
zeitlinger May 15, 2025
40ef149
gci
zeitlinger May 15, 2025
1c16195
fix todo
zeitlinger May 15, 2025
0a574d1
simplify logic
zeitlinger May 16, 2025
6c5c964
docs
zeitlinger May 20, 2025
b45f22f
container can never be a fallback for service name
zeitlinger May 20, 2025
e437d36
container can never be a fallback for service name
zeitlinger May 20, 2025
7760e5d
container can never be a fallback for service name
zeitlinger May 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .chloggen/service-resource-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
change_type: enhancement

component: k8sattributesprocessor

note: Add option to configure automatic service resource attributes

issues: [37114]

subtext: |
Implements [Service Attributes](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#service-attributes).

If you are using the file log receiver, you can now create the same resource attributes as traces (via OTLP) received
from an application instrumented with the OpenTelemetry Operator -
simply by adding the
`extract: { metadata: ["service.namespace", "service.name", "service.version", "service.instance.id"] }`
configuration to the `k8sattributesprocessor` processor.
See the [documentation](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/k8sattributesprocessor/README.md#configuring-recommended-resource-attributes) for more details.
17 changes: 17 additions & 0 deletions processor/k8sattributesprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ are then also available for the use within association rules. Available attribut
- k8s.job.name
- k8s.node.name
- k8s.cluster.uid
- [service.namespace](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#how-servicenamespace-should-be-calculated)
- [service.name](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#how-servicename-should-be-calculated)
- [service.version](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#how-serviceversion-should-be-calculated)
- [service.instance.id](https://opentelemetry.io/docs/specs/semconv/non-normative/k8s-attributes/#how-serviceinstanceid-should-be-calculated)
- Any tags extracted from the pod labels and annotations, as described in [extracting attributes from pod labels and annotations](#extracting-attributes-from-pod-labels-and-annotations)


Expand All @@ -108,11 +112,15 @@ correctly associate the matching container to the resource:
- container.image.name
- container.image.tag
- container.image.repo_digests (if k8s CRI populates [repository digest field](https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/model/registry/container.yaml#L60-L71))
- service.version
- service.instance.id
2. If the `k8s.container.name` resource attribute is provided, the following additional attributes will be available:
- container.id (if the `k8s.container.restart_count` resource attribute is not provided, it's not guaranteed to get the right container ID.)
- container.image.name
- container.image.tag
- container.image.repo_digests (if k8s CRI populates [repository digest field](https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/model/registry/container.yaml#L60-L71))
- service.version
- service.instance.id
3. If the `k8s.container.restart_count` resource attribute is provided, it can be used to associate with a particular container
instance. If it's not set, the latest container instance will be used:
- container.id (not added by default, has to be specified in `metadata`)
Expand Down Expand Up @@ -262,6 +270,11 @@ The processor can be configured to set the
```yaml
extract:
otel_annotations: true
metadata:
- service.namespace
- service.name
- service.version
- service.instance.id
```

### Config example
Expand All @@ -283,6 +296,10 @@ k8sattributes/2:
- k8s.namespace.name
- k8s.node.name
- k8s.pod.start_time
- service.namespace
- service.name
- service.version
- service.instance.id
labels:
# This label extraction rule takes the value 'app.kubernetes.io/component' label and maps it to the 'app.label.component' attribute which will be added to the associated resources
- tag_name: app.label.component
Expand Down
2 changes: 2 additions & 0 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func (cfg *Config) Validate() error {
string(conventions.K8SNodeNameKey), string(conventions.K8SNodeUIDKey),
string(conventions.K8SContainerNameKey), string(conventions.ContainerIDKey),
string(conventions.ContainerImageNameKey), string(conventions.ContainerImageTagKey),
string(conventions.ServiceNamespaceKey), string(conventions.ServiceNameKey),
string(conventions.ServiceVersionKey), string(conventions.ServiceInstanceIDKey),
containerImageRepoDigests, clusterUID:
default:
return fmt.Errorf("\"%s\" is not a supported metadata field", field)
Expand Down
4 changes: 4 additions & 0 deletions processor/k8sattributesprocessor/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
| k8s.replicaset.uid | The UID of the ReplicaSet. | Any Str | false |
| k8s.statefulset.name | The name of the StatefulSet. | Any Str | false |
| k8s.statefulset.uid | The UID of the StatefulSet. | Any Str | false |
| service.instance.id | The instance ID of the service. | Any Str | false |
| service.name | The name of the service. | Any Str | false |
| service.namespace | The namespace of the service. | Any Str | false |
| service.version | The version of the service. | Any Str | false |

## Internal Telemetry

Expand Down
2 changes: 1 addition & 1 deletion processor/k8sattributesprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sat
go 1.23.0

require (
github.com/distribution/reference v0.6.0
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.126.0
Expand Down Expand Up @@ -48,7 +49,6 @@ require (
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/docker v28.1.1+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
Expand Down
133 changes: 116 additions & 17 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"sync"
"time"

"github.com/distribution/reference"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/otel/attribute"
conventions "go.opentelemetry.io/otel/semconv/v1.6.1"
"go.uber.org/zap"
apps_v1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -77,6 +79,8 @@ var rRegex = regexp.MustCompile(`^(.*)-[0-9a-zA-Z]+$`)
// format: [cronjob-name]-[time-hash-int]
var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`)

var errCannotRetrieveImage = errors.New("cannot retrieve image name")

// New initializes a new k8s Client.
func New(
set component.TelemetrySettings,
Expand Down Expand Up @@ -453,6 +457,9 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.PodName {
tags[string(conventions.K8SPodNameKey)] = pod.Name
}
if c.Rules.ServiceName {
tags[string(conventions.ServiceNameKey)] = pod.Name
}

if c.Rules.PodHostName {
tags[tagHostName] = pod.Spec.Hostname
Expand Down Expand Up @@ -487,7 +494,7 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
c.Rules.JobUID || c.Rules.JobName ||
c.Rules.StatefulSetUID || c.Rules.StatefulSetName ||
c.Rules.DeploymentName || c.Rules.DeploymentUID ||
c.Rules.CronJobName {
c.Rules.CronJobName || c.Rules.ServiceName {
for _, ref := range pod.OwnerReferences {
switch ref.Kind {
case "ReplicaSet":
Expand All @@ -497,10 +504,20 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.ReplicaSetName {
tags[string(conventions.K8SReplicaSetNameKey)] = ref.Name
}
if c.Rules.DeploymentName {
if c.Rules.ServiceName {
tags[string(conventions.ServiceNameKey)] = ref.Name
}
if c.Rules.DeploymentName || c.Rules.ServiceName {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[string(conventions.K8SDeploymentNameKey)] = replicaset.Deployment.Name
name := replicaset.Deployment.Name
if name != "" {
if c.Rules.DeploymentName {
tags[string(conventions.K8SDeploymentNameKey)] = name
}
if c.Rules.ServiceName {
// deployment name wins over replicaset name
tags[string(conventions.ServiceNameKey)] = name
}
}
}
}
Expand All @@ -518,26 +535,42 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.DaemonSetName {
tags[string(conventions.K8SDaemonSetNameKey)] = ref.Name
}
if c.Rules.ServiceName {
tags[string(conventions.ServiceNameKey)] = ref.Name
}
case "StatefulSet":
if c.Rules.StatefulSetUID {
tags[string(conventions.K8SStatefulSetUIDKey)] = string(ref.UID)
}
if c.Rules.StatefulSetName {
tags[string(conventions.K8SStatefulSetNameKey)] = ref.Name
}
case "Job":
if c.Rules.CronJobName {
parts := c.cronJobRegex.FindStringSubmatch(ref.Name)
if len(parts) == 2 {
tags[string(conventions.K8SCronJobNameKey)] = parts[1]
}
if c.Rules.ServiceName {
tags[string(conventions.ServiceNameKey)] = ref.Name
}
case "Job":
if c.Rules.JobUID {
tags[string(conventions.K8SJobUIDKey)] = string(ref.UID)
}
if c.Rules.JobName {
tags[string(conventions.K8SJobNameKey)] = ref.Name
}
if c.Rules.ServiceName {
tags[string(conventions.ServiceNameKey)] = ref.Name
}
if c.Rules.CronJobName || c.Rules.ServiceName {
parts := c.cronJobRegex.FindStringSubmatch(ref.Name)
if len(parts) == 2 {
name := parts[1]
if c.Rules.CronJobName {
tags[string(conventions.K8SCronJobNameKey)] = name
}
if c.Rules.ServiceName {
// cronjob name wins over job name
tags[string(conventions.ServiceNameKey)] = name
}
}
}
}
}
}
Expand All @@ -558,12 +591,28 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
r.extractFromPodMetadata(pod.Labels, tags, "k8s.pod.labels.%s")
}

if c.Rules.ServiceName {
copyLabel(pod, tags, "app.kubernetes.io/name", conventions.ServiceNameKey)
// app.kubernetes.io/instance has a higher precedence than app.kubernetes.io/name
copyLabel(pod, tags, "app.kubernetes.io/instance", conventions.ServiceNameKey)
}

if c.Rules.ServiceVersion {
copyLabel(pod, tags, "app.kubernetes.io/version", conventions.ServiceVersionKey)
}

for _, r := range c.Rules.Annotations {
r.extractFromPodMetadata(pod.Annotations, tags, "k8s.pod.annotations.%s")
}
return tags
}

func copyLabel(pod *api_v1.Pod, tags map[string]string, labelKey string, key attribute.Key) {
if val, ok := pod.Labels[labelKey]; ok {
tags[string(key)] = val
}
}

// This function removes all data from the Pod except what is required by extraction rules and pod association
func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Pod {
// name, namespace, uid, start time and ip are needed for identifying Pods
Expand Down Expand Up @@ -626,7 +675,7 @@ func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Po
removeUnnecessaryContainerData := func(c api_v1.Container) api_v1.Container {
transformedContainer := api_v1.Container{}
transformedContainer.Name = c.Name // we always need the name, it's used for identification
if rules.ContainerImageName || rules.ContainerImageTag {
if rules.ContainerImageName || rules.ContainerImageTag || rules.ServiceVersion {
transformedContainer.Image = c.Image
}
return transformedContainer
Expand All @@ -644,7 +693,7 @@ func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Po
}
}

if len(rules.Labels) > 0 {
if len(rules.Labels) > 0 || rules.ServiceName || rules.ServiceVersion {
transformedPod.Labels = pod.Labels
}

Expand All @@ -659,6 +708,38 @@ func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Po
return &transformedPod
}

// parseServiceVersionFromImage parses the service version for differently-formatted image names
// according to https://github.com/open-telemetry/semantic-conventions/blob/main/docs/non-normative/k8s-attributes.md#how-serviceversion-should-be-calculated
func parseServiceVersionFromImage(image string) (string, error) {
ref, err := reference.Parse(image)
if err != nil {
return "", err
}

namedRef, ok := ref.(reference.Named)
if !ok {
return "", errCannotRetrieveImage
}
var tag, digest string
if taggedRef, ok := namedRef.(reference.Tagged); ok {
tag = taggedRef.Tag()
}
if digestedRef, ok := namedRef.(reference.Digested); ok {
digest = digestedRef.Digest().String()
}
if digest != "" {
if tag != "" {
return fmt.Sprintf("%s@%s", tag, digest), nil
}
return digest, nil
}
if tag != "" {
return tag, nil
}

return "", errCannotRetrieveImage
}

func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContainers {
containers := PodContainers{
ByID: map[string]*Container{},
Expand All @@ -667,7 +748,8 @@ func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContain
if !needContainerAttributes(c.Rules) {
return containers
}
if c.Rules.ContainerImageName || c.Rules.ContainerImageTag {
if c.Rules.ContainerImageName || c.Rules.ContainerImageTag ||
c.Rules.ServiceVersion || c.Rules.ServiceInstanceID {
for _, spec := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
container := &Container{}
imageRef, err := dcommon.ParseImageName(spec.Image)
Expand All @@ -678,18 +760,28 @@ func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContain
if c.Rules.ContainerImageTag {
container.ImageTag = imageRef.Tag
}
if c.Rules.ServiceVersion {
serviceVersion, err := parseServiceVersionFromImage(spec.Image)
if err == nil {
container.ServiceVersion = serviceVersion
}
}
}
containers.ByName[spec.Name] = container
}
}
for _, apiStatus := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
container, ok := containers.ByName[apiStatus.Name]
containerName := apiStatus.Name
container, ok := containers.ByName[containerName]
if !ok {
container = &Container{}
containers.ByName[apiStatus.Name] = container
containers.ByName[containerName] = container
}
if c.Rules.ContainerName {
container.Name = apiStatus.Name
container.Name = containerName
}
if c.Rules.ServiceInstanceID {
container.ServiceInstanceID = automaticServiceInstanceID(pod, containerName)
}
containerID := apiStatus.ContainerID
// Remove container runtime prefix
Expand Down Expand Up @@ -1022,7 +1114,9 @@ func needContainerAttributes(rules ExtractionRules) bool {
rules.ContainerName ||
rules.ContainerImageTag ||
rules.ContainerImageRepoDigests ||
rules.ContainerID
rules.ContainerID ||
rules.ServiceVersion ||
rules.ServiceInstanceID
}

func (c *WatchClient) handleReplicaSetAdd(obj any) {
Expand Down Expand Up @@ -1127,3 +1221,8 @@ func ignoreDeletedFinalStateUnknown(obj any) any {
}
return obj
}

func automaticServiceInstanceID(pod *api_v1.Pod, containerName string) string {
resNames := []string{pod.Namespace, pod.Name, containerName}
return strings.Join(resNames, ".")
}
Loading
Loading