Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/net-cgroup-system-stats-monitor.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"net": {
"excludeInterfaceRegexp": "^(cali|tunl|veth)",
"metricsConfigs": {
"net/rx_bytes": {
"displayName": "net/rx_bytes"
Expand Down
2 changes: 2 additions & 0 deletions pkg/systemstatsmonitor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ Below metrics are collected from `net` component:

All of the above have `interface_name` label for the net interface.

Interfaces can be skipped if they don't add any value. See field `ExcludeInterfaceRegexp`.

## Windows Support

NPD has preliminary support for system stats monitor. The following modules are supported:
Expand Down
45 changes: 32 additions & 13 deletions pkg/systemstatsmonitor/net_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ import (
"github.com/prometheus/procfs"
)

type newInt64MetricFn func(metricID metrics.MetricID, viewName string, description string, unit string, aggregation metrics.Aggregation, tagNames []string) (metrics.Int64MetricInterface, error)

// newInt64Metric is a wrapper of metrics.NewInt64Metric that returns an interface instead of the specific type
func newInt64Metric(metricID metrics.MetricID, viewName string, description string, unit string, aggregation metrics.Aggregation, tagNames []string) (metrics.Int64MetricInterface, error) {
return metrics.NewInt64Metric(metricID, viewName, description, unit, aggregation, tagNames)
}

type netCollector struct {
config *ssmtypes.NetStatsConfig
procPath string
recorder *ifaceStatRecorder
}

func NewNetCollectorOrDie(netConfig *ssmtypes.NetStatsConfig, procPath string) *netCollector {
nc := netCollector{
config: netConfig,
procPath: procPath,
recorder: newIfaceStatRecorder(),
}

func (nc *netCollector) initOrDie() {
nc.mustRegisterMetric(
metrics.NetDevRxBytes,
"Cumulative count of bytes received.",
Expand Down Expand Up @@ -191,8 +192,16 @@ func NewNetCollectorOrDie(netConfig *ssmtypes.NetStatsConfig, procPath string) *
return int64(stat.TxCompressed)
},
)
}

return &nc
func NewNetCollectorOrDie(netConfig *ssmtypes.NetStatsConfig, procPath string) *netCollector {
nc := &netCollector{
config: netConfig,
procPath: procPath,
recorder: newIfaceStatRecorder(newInt64Metric),
}
nc.initOrDie()
return nc
}

func (nc *netCollector) mustRegisterMetric(metricID metrics.MetricID, description, unit string,
Expand All @@ -216,7 +225,12 @@ func (nc *netCollector) recordNetDev() {
return
}

excludeInterfaceRegexp := nc.config.ExcludeInterfaceRegexp.R
for iface, ifaceStats := range stats {
if excludeInterfaceRegexp != nil && excludeInterfaceRegexp.MatchString(iface) {
glog.V(6).Infof("Network interface %s matched exclude regexp %q, skipping recording", iface, excludeInterfaceRegexp)
continue
}
tags := map[string]string{}
tags[interfaceNameLabel] = iface

Expand All @@ -234,11 +248,16 @@ func (nc *netCollector) collect() {

// TODO(@oif): Maybe implements a generic recorder
type ifaceStatRecorder struct {
collectors map[metrics.MetricID]ifaceStatCollector
// We use a function to allow injecting a mock for testing
newInt64Metric newInt64MetricFn
collectors map[metrics.MetricID]ifaceStatCollector
}

func newIfaceStatRecorder() *ifaceStatRecorder {
return &ifaceStatRecorder{collectors: make(map[metrics.MetricID]ifaceStatCollector)}
func newIfaceStatRecorder(newInt64Metric newInt64MetricFn) *ifaceStatRecorder {
return &ifaceStatRecorder{
newInt64Metric: newInt64Metric,
collectors: make(map[metrics.MetricID]ifaceStatCollector),
}
}

func (r *ifaceStatRecorder) Register(metricID metrics.MetricID, viewName string, description string,
Expand All @@ -247,7 +266,7 @@ func (r *ifaceStatRecorder) Register(metricID metrics.MetricID, viewName string,
// Check duplication
return fmt.Errorf("metric %q already registered", metricID)
}
metric, err := metrics.NewInt64Metric(metricID, viewName, description, unit, aggregation, tagNames)
metric, err := r.newInt64Metric(metricID, viewName, description, unit, aggregation, tagNames)
if err != nil {
return err
}
Expand All @@ -268,6 +287,6 @@ func (r ifaceStatRecorder) RecordWithSameTags(stat procfs.NetDevLine, tags map[s
}

type ifaceStatCollector struct {
metric *metrics.Int64Metric
metric metrics.Int64MetricInterface
exporter func(procfs.NetDevLine) int64
}
190 changes: 190 additions & 0 deletions pkg/systemstatsmonitor/net_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package systemstatsmonitor

import (
"io/ioutil"
"os"
"path"
"regexp"
"testing"

ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types"
"k8s.io/node-problem-detector/pkg/util/metrics"
)

var defaultMetricsConfig = map[string]ssmtypes.MetricConfig{
string(metrics.NetDevRxBytes): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevRxBytes)},
string(metrics.NetDevRxPackets): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevRxPackets)},
string(metrics.NetDevRxErrors): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevRxErrors)},
string(metrics.NetDevRxDropped): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevRxDropped)},
string(metrics.NetDevRxFifo): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevRxFifo)},
string(metrics.NetDevRxFrame): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevRxFrame)},
string(metrics.NetDevRxCompressed): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevRxCompressed)},
string(metrics.NetDevRxMulticast): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevRxMulticast)},
string(metrics.NetDevTxBytes): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevTxBytes)},
string(metrics.NetDevTxPackets): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevTxPackets)},
string(metrics.NetDevTxErrors): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevTxErrors)},
string(metrics.NetDevTxDropped): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevTxDropped)},
string(metrics.NetDevTxFifo): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevTxFifo)},
string(metrics.NetDevTxCollisions): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevTxCollisions)},
string(metrics.NetDevTxCarrier): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevTxCarrier)},
string(metrics.NetDevTxCompressed): ssmtypes.MetricConfig{DisplayName: string(metrics.NetDevTxCompressed)},
}

// To get a similar output, run `cat /proc/net/dev` on a Linux machine
// docker: 1500 100 8 7 0 0 0 0 9000 450 565 200 20 30 0 0
const fakeNetProcContent = `Inter-| Receive | Transmit
face |bytes packets errs drop fifo frame compressed multicast|bytes packets errs drop fifo colls carrier compressed
eth0: 5000 100 0 0 0 0 0 0 2500 30 0 0 0 0 0 0
docker0: 1000 90 8 7 0 0 0 0 0 0 0 0 0 0 0 0
docker1: 500 10 0 0 0 0 0 0 3000 150 15 0 20 30 0 0
docker2: 0 0 0 0 0 0 0 0 6000 300 550 200 0 0 0 0
`

// newFakeInt64Metric is a wrapper around metrics.NewFakeInt64Metric
func newFakeInt64Metric(metricID metrics.MetricID, viewName string, description string, unit string, aggregation metrics.Aggregation, tagNames []string) (metrics.Int64MetricInterface, error) {
return metrics.NewFakeInt64Metric(viewName, aggregation, tagNames), nil
}

// testCollectAux is a test auxiliary function used for testing netCollector.Collect
func testCollectAux(t *testing.T, name string, excludeInterfaceRegexp ssmtypes.NetStatsInterfaceRegexp, validate func(*testing.T, *netCollector)) {
// mkdir /tmp/proc-X
procDir, err := ioutil.TempDir(os.TempDir(), "proc-")
if err != nil {
t.Fatalf("Failed to create temp proc directory: %v", err)
}
// rm -r /tmp/proc-X
defer os.RemoveAll(procDir)
// mkdir -C /tmp/proc-X/net
procNetDir := path.Join(procDir, "net")
if err := os.Mkdir(procNetDir, 0777); err != nil {
t.Fatalf("Failed to create directory %q: %v", procNetDir, err)
}

// touch /tmp/proc-X/net/dev
filename := path.Join(procNetDir, "dev")
f, err := os.Create(filename)
if err != nil {
t.Fatalf("Failed to create file %q: %v", filename, err)
}
// echo $FILE_CONTENT > /tmp/proc-X/net/dev
if _, err = f.WriteString(fakeNetProcContent); err != nil {
t.Fatalf("Failed to write to file %q: %v", filename, err)
}
if err = f.Close(); err != nil {
t.Fatalf("Failed to close file %q: %v", filename, err)
}

// Build the netCollector
config := &ssmtypes.NetStatsConfig{
ExcludeInterfaceRegexp: excludeInterfaceRegexp,
MetricsConfigs: defaultMetricsConfig,
}
netCollector := &netCollector{
config: config,
procPath: procDir,
recorder: newIfaceStatRecorder(newFakeInt64Metric),
}
netCollector.initOrDie()
netCollector.collect()
validate(t, netCollector)
}

func TestCollect(t *testing.T) {
tcs := []struct {
Name string
ExcludeInterfaceRegexp ssmtypes.NetStatsInterfaceRegexp
Validate func(t *testing.T, nc *netCollector)
}{
{
Name: "NoFilterMatch",
ExcludeInterfaceRegexp: ssmtypes.NetStatsInterfaceRegexp{R: regexp.MustCompile(`^fake$`)},
Validate: func(t *testing.T, nc *netCollector) {
// We just validate two metrics, no need to check all of them
expectedValues := map[metrics.MetricID]map[string]int64{
metrics.NetDevRxBytes: map[string]int64{
"eth0": 5000,
"docker0": 1000,
"docker1": 500,
"docker2": 0,
},
metrics.NetDevTxBytes: map[string]int64{
"eth0": 2500,
"docker0": 0,
"docker1": 3000,
"docker2": 6000,
},
}
for metricID, interfaceValues := range expectedValues {
collector, ok := nc.recorder.collectors[metricID]
if !ok {
t.Errorf("Failed to get collector of metric %s", metricID)
continue
}
fakeInt64Metric, ok := collector.metric.(*metrics.FakeInt64Metric)
if !ok {
t.Fatalf("Failed to convert metric %s to fakeMetric", string(metricID))
}
for _, repr := range fakeInt64Metric.ListMetrics() {
interfaceName, ok := repr.Labels[interfaceNameLabel]
if !ok {
t.Fatalf("Failed to get label %q for ", interfaceNameLabel)
}
expectedValue, ok := interfaceValues[interfaceName]
if !ok {
t.Fatalf("Failed to get metric value for interface %q", interfaceName)
}
if repr.Value != expectedValue {
t.Errorf("Mismatch in metric %q for interface %q: expected %d, got %d", metricID, interfaceName, expectedValue, repr.Value)
}
}
}
},
},
{
Name: "FilterMatch",
ExcludeInterfaceRegexp: ssmtypes.NetStatsInterfaceRegexp{R: regexp.MustCompile(`docker\d+`)},
Validate: func(t *testing.T, nc *netCollector) {
// We just validate two metrics, no need to check all of them
expectedValues := map[metrics.MetricID]map[string]int64{
metrics.NetDevRxBytes: map[string]int64{
"eth0": 5000,
},
metrics.NetDevTxBytes: map[string]int64{
"eth0": 2500,
},
}
for metricID, interfaceValues := range expectedValues {
collector, ok := nc.recorder.collectors[metricID]
if !ok {
t.Errorf("Failed to get collector of metric %s", metricID)
continue
}
fakeInt64Metric, ok := collector.metric.(*metrics.FakeInt64Metric)
if !ok {
t.Fatalf("Failed to convert metric %s to fakeMetric", string(metricID))
}
for _, repr := range fakeInt64Metric.ListMetrics() {
interfaceName, ok := repr.Labels[interfaceNameLabel]
if !ok {
t.Fatalf("Failed to get label %q for ", interfaceNameLabel)
}
expectedValue, ok := interfaceValues[interfaceName]
if !ok {
t.Fatalf("Failed to get metric value for interface %q", interfaceName)
}
if repr.Value != expectedValue {
t.Errorf("Mismatch in metric %q for interface %q: expected %d, got %d", metricID, interfaceName, expectedValue, repr.Value)
}
}
}
},
},
}
for _, tc := range tcs {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
testCollectAux(t, tc.Name, tc.ExcludeInterfaceRegexp, tc.Validate)
})
}
}
30 changes: 29 additions & 1 deletion pkg/systemstatsmonitor/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package types
import (
"fmt"
"os"
"regexp"
"time"
)

Expand Down Expand Up @@ -58,8 +59,35 @@ type OSFeatureStatsConfig struct {
KnownModulesConfigPath string `json:"knownModulesConfigPath"`
}

// In order to marshal/unmarshal regexp, we need to implement
// MarshalText/UnmarshalText methods in a wrapper struct
type NetStatsInterfaceRegexp struct {
R *regexp.Regexp
}

func (r *NetStatsInterfaceRegexp) UnmarshalText(data []byte) error {
// We don't build Regexp if data is empty
if len(data) == 0 {
return nil
}
regex, err := regexp.Compile(string(data))
if err != nil {
return err
}
r.R = regex
return nil
}

func (r NetStatsInterfaceRegexp) MarshalText() ([]byte, error) {
if r.R == nil {
return nil, nil
}
return []byte(r.R.String()), nil
}

type NetStatsConfig struct {
MetricsConfigs map[string]MetricConfig `json:"metricsConfigs"`
MetricsConfigs map[string]MetricConfig `json:"metricsConfigs"`
ExcludeInterfaceRegexp NetStatsInterfaceRegexp `json:"excludeInterfaceRegexp"`
}

type SystemStatsConfig struct {
Expand Down