Skip to content

Commit 7f13683

Browse files
committed
stats: add stats server
Signed-off-by: Peter Hunt <[email protected]>
1 parent 43db34f commit 7f13683

File tree

1 file changed

+339
-0
lines changed

1 file changed

+339
-0
lines changed

internal/lib/stats/stats_server.go

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
package statsserver
2+
3+
import (
4+
"context"
5+
"path/filepath"
6+
"sync"
7+
"time"
8+
9+
"github.com/containernetworking/plugins/pkg/ns"
10+
cstorage "github.com/containers/storage"
11+
"github.com/cri-o/cri-o/internal/lib/sandbox"
12+
"github.com/cri-o/cri-o/internal/oci"
13+
"github.com/cri-o/cri-o/pkg/config"
14+
"github.com/cri-o/cri-o/server/cri/types"
15+
"github.com/pkg/errors"
16+
"github.com/sirupsen/logrus"
17+
"github.com/vishvananda/netlink"
18+
)
19+
20+
// StatsServer is responsible for maintaining a list of container and sandbox stats.
21+
// If collectionPeriod is > 0, it maintains this list by updating the stats on collectionPeriod frequency.
22+
// Otherwise, it only updates the stats as they're requested.
23+
type StatsServer struct {
24+
shutdown chan struct{}
25+
collectionPeriod time.Duration
26+
sboxStats map[string]*types.PodSandboxStats
27+
ctrStats map[string]*types.ContainerStats
28+
parentServerIface
29+
sync.Mutex
30+
}
31+
32+
// parentServerIface is an interface for requesting information from the parent ContainerServer.
33+
// This is to be able to decouple stats server from the ContainerServer package, while also preventing
34+
// data duplication (mainly in the active list of sandboxes), and avoid circular dependencies to boot.
35+
type parentServerIface interface {
36+
Runtime() *oci.Runtime
37+
Store() cstorage.Store
38+
ListSandboxes() []*sandbox.Sandbox
39+
GetSandbox(string) *sandbox.Sandbox
40+
Config() *config.Config
41+
}
42+
43+
// New returns a new StatsServer, deriving the needed information from the provided parentServerIface.
44+
func New(cs parentServerIface) *StatsServer {
45+
ss := &StatsServer{
46+
shutdown: make(chan struct{}, 1),
47+
collectionPeriod: time.Duration(cs.Config().StatsCollectionPeriod) * time.Second,
48+
sboxStats: make(map[string]*types.PodSandboxStats),
49+
ctrStats: make(map[string]*types.ContainerStats),
50+
parentServerIface: cs,
51+
}
52+
go ss.updateLoop()
53+
return ss
54+
}
55+
56+
// updateLoop updates the current list of stats every collectionPeriod seconds.
57+
// If collectionPeriod is 0, it does nothing.
58+
func (ss *StatsServer) updateLoop() {
59+
if ss.collectionPeriod == 0 {
60+
// fetch stats on-demand
61+
return
62+
}
63+
for {
64+
select {
65+
case <-ss.shutdown:
66+
return
67+
case <-time.After(ss.collectionPeriod):
68+
}
69+
ss.update()
70+
}
71+
}
72+
73+
// update updates the list of container and sandbox stats.
74+
// It does so by updating the stats of every sandbox, which in turn
75+
// updates the stats for each container it has.
76+
func (ss *StatsServer) update() {
77+
ss.Lock()
78+
defer ss.Unlock()
79+
80+
for _, sb := range ss.ListSandboxes() {
81+
ss.updateSandbox(sb)
82+
}
83+
}
84+
85+
// updateSandbox updates the StatsServer's entry for this sandbox, as well as each child container.
86+
// It first populates the stats from the CgroupParent, then calculates network usage, updates
87+
// each of its children container stats by calling into the runtime, and finally calculates the CPUNanoCores.
88+
func (ss *StatsServer) updateSandbox(sb *sandbox.Sandbox) *types.PodSandboxStats {
89+
if sb == nil {
90+
return nil
91+
}
92+
sandboxStats := &types.PodSandboxStats{
93+
Attributes: &types.PodSandboxAttributes{
94+
ID: sb.ID(),
95+
Labels: sb.Labels(),
96+
Metadata: sb.Metadata(),
97+
Annotations: sb.Annotations(),
98+
},
99+
}
100+
if err := ss.Config().CgroupManager().PopulateSandboxCgroupStats(sb.CgroupParent(), sandboxStats); err != nil {
101+
logrus.Errorf("Error getting sandbox stats %s: %v", sb.ID(), err)
102+
}
103+
if err := ss.populateNetworkUsage(sandboxStats, sb); err != nil {
104+
logrus.Errorf("Error adding network stats for sandbox %s: %v", sb.ID(), err)
105+
}
106+
containerStats := make([]*types.ContainerStats, 0, len(sb.Containers().List()))
107+
for _, c := range sb.Containers().List() {
108+
if c.StateNoLock().Status == oci.ContainerStateStopped {
109+
continue
110+
}
111+
cStats, err := ss.Runtime().ContainerStats(context.TODO(), c, sb.CgroupParent())
112+
if err != nil {
113+
logrus.Errorf("Error getting container stats %s: %v", c.ID(), err)
114+
continue
115+
}
116+
ss.populateWritableLayer(cStats, c)
117+
if oldcStats, ok := ss.ctrStats[c.ID()]; ok {
118+
updateUsageNanoCores(oldcStats.CPU, cStats.CPU)
119+
}
120+
}
121+
sandboxStats.Containers = containerStats
122+
if old, ok := ss.sboxStats[sb.ID()]; ok {
123+
updateUsageNanoCores(old.CPU, sandboxStats.CPU)
124+
}
125+
ss.sboxStats[sb.ID()] = sandboxStats
126+
return sandboxStats
127+
}
128+
129+
// updateContainer calls into the runtime handler to update the container stats,
130+
// as well as populates the writable layer by calling into the container storage.
131+
// If this container already existed in the stats server, the CPU nano cores are calculated as well.
132+
func (ss *StatsServer) updateContainer(c *oci.Container, sb *sandbox.Sandbox) *types.ContainerStats {
133+
if c == nil || sb == nil {
134+
return nil
135+
}
136+
if c.StateNoLock().Status == oci.ContainerStateStopped {
137+
return nil
138+
}
139+
cStats, err := ss.Runtime().ContainerStats(context.TODO(), c, sb.CgroupParent())
140+
if err != nil {
141+
logrus.Errorf("Error getting container stats %s: %v", c.ID(), err)
142+
return nil
143+
}
144+
ss.populateWritableLayer(cStats, c)
145+
if oldcStats, ok := ss.ctrStats[c.ID()]; ok {
146+
updateUsageNanoCores(oldcStats.CPU, cStats.CPU)
147+
}
148+
ss.ctrStats[c.ID()] = cStats
149+
return cStats
150+
}
151+
152+
// updateUsageNanoCores calculates the usage nano cores by averaging the CPU usage between the timestamps
153+
// of the old usage and the recently gathered usage.
154+
func updateUsageNanoCores(old, current *types.CPUUsage) {
155+
if old == nil || current == nil || old.UsageCoreNanoSeconds == nil || current.UsageCoreNanoSeconds == nil {
156+
return
157+
}
158+
159+
nanoSeconds := current.Timestamp - old.Timestamp
160+
161+
usageNanoCores := uint64(float64(current.UsageCoreNanoSeconds.Value-old.UsageCoreNanoSeconds.Value) /
162+
float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
163+
164+
current.UsageNanoCores = &types.UInt64Value{
165+
Value: usageNanoCores,
166+
}
167+
}
168+
169+
// populateWritableLayer attempts to populate the container stats writable layer.
170+
func (ss *StatsServer) populateWritableLayer(stats *types.ContainerStats, container *oci.Container) {
171+
writableLayer, err := ss.writableLayerForContainer(container)
172+
if err != nil {
173+
logrus.Error(err)
174+
}
175+
stats.WritableLayer = writableLayer
176+
}
177+
178+
// writableLayerForContainer gathers information about the container's writable layer.
179+
// It does so by calling into the GraphDriver's endpoint to get the UsedBytes and InodesUsed.
180+
func (ss *StatsServer) writableLayerForContainer(container *oci.Container) (*types.FilesystemUsage, error) {
181+
writableLayer := &types.FilesystemUsage{
182+
Timestamp: time.Now().UnixNano(),
183+
FsID: &types.FilesystemIdentifier{Mountpoint: container.MountPoint()},
184+
}
185+
driver, err := ss.Store().GraphDriver()
186+
if err != nil {
187+
return writableLayer, errors.Wrapf(err, "Unable to get graph driver for disk usage for container %s", container.ID())
188+
}
189+
id := filepath.Base(filepath.Dir(container.MountPoint()))
190+
usage, err := driver.ReadWriteDiskUsage(id)
191+
if err != nil {
192+
return writableLayer, errors.Wrapf(err, "Unable to get disk usage for container %s", container.ID())
193+
}
194+
writableLayer.UsedBytes = &types.UInt64Value{Value: uint64(usage.Size)}
195+
writableLayer.InodesUsed = &types.UInt64Value{Value: uint64(usage.InodeCount)}
196+
197+
return writableLayer, nil
198+
}
199+
200+
// populateNetworkUsage gathers information about the network from within the sandbox's network namespace.
201+
func (ss *StatsServer) populateNetworkUsage(stats *types.PodSandboxStats, sb *sandbox.Sandbox) error {
202+
return ns.WithNetNSPath(sb.NetNsPath(), func(_ ns.NetNS) error {
203+
links, err := netlink.LinkList()
204+
if err != nil {
205+
logrus.Errorf("Unable to retrieve network namespace links: %v", err)
206+
return err
207+
}
208+
stats.Network = &types.NetworkUsage{
209+
Interfaces: make([]*types.NetworkInterfaceUsage, 0, len(links)-1),
210+
}
211+
for i := range links {
212+
iface, err := linkToInterface(links[i])
213+
if err != nil {
214+
logrus.Errorf("Failed to %v for pod %s", err, sb.ID())
215+
continue
216+
}
217+
// TODO FIXME or DefaultInterfaceName?
218+
if i == 0 {
219+
stats.Network.DefaultInterface = iface
220+
} else {
221+
stats.Network.Interfaces = append(stats.Network.Interfaces, iface)
222+
}
223+
}
224+
return nil
225+
})
226+
}
227+
228+
// linkToInterface translates information found from the netlink package
229+
// into CRI the NetworkInterfaceUsage structure.
230+
func linkToInterface(link netlink.Link) (*types.NetworkInterfaceUsage, error) {
231+
attrs := link.Attrs()
232+
if attrs == nil {
233+
return nil, errors.New("get stats for iface")
234+
}
235+
if attrs.Statistics == nil {
236+
return nil, errors.Errorf("get stats for iface %s", attrs.Name)
237+
}
238+
return &types.NetworkInterfaceUsage{
239+
Name: attrs.Name,
240+
RxBytes: &types.UInt64Value{Value: attrs.Statistics.RxBytes},
241+
RxErrors: &types.UInt64Value{Value: attrs.Statistics.RxErrors},
242+
TxBytes: &types.UInt64Value{Value: attrs.Statistics.TxBytes},
243+
TxErrors: &types.UInt64Value{Value: attrs.Statistics.TxErrors},
244+
}, nil
245+
}
246+
247+
// StatsForSandbox returns the stats for the given sandbox
248+
func (ss *StatsServer) StatsForSandbox(sb *sandbox.Sandbox) *types.PodSandboxStats {
249+
ss.Lock()
250+
defer ss.Unlock()
251+
return ss.statsForSandbox(sb)
252+
}
253+
254+
// StatsForSandboxes returns the stats for the given list of sandboxes
255+
func (ss *StatsServer) StatsForSandboxes(sboxes []*sandbox.Sandbox) []*types.PodSandboxStats {
256+
ss.Lock()
257+
defer ss.Unlock()
258+
stats := make([]*types.PodSandboxStats, 0, len(sboxes))
259+
for _, sb := range sboxes {
260+
if stat := ss.statsForSandbox(sb); stat != nil {
261+
stats = append(stats, stat)
262+
}
263+
}
264+
return stats
265+
}
266+
267+
// statsForSandbox is an internal, non-locking version of StatsForSandbox
268+
// that returns (and occasionally gathers) the stats for the given sandbox.
269+
func (ss *StatsServer) statsForSandbox(sb *sandbox.Sandbox) *types.PodSandboxStats {
270+
if ss.collectionPeriod == 0 {
271+
return ss.updateSandbox(sb)
272+
}
273+
sboxStat, ok := ss.sboxStats[sb.ID()]
274+
if ok {
275+
return sboxStat
276+
}
277+
// Cache miss, try again
278+
return ss.updateSandbox(sb)
279+
}
280+
281+
// RemoveStatsForSandbox removes the saved entry for the specified sandbox
282+
// to prevent the map from always growing.
283+
func (ss *StatsServer) RemoveStatsForSandbox(sb *sandbox.Sandbox) {
284+
ss.Lock()
285+
defer ss.Unlock()
286+
delete(ss.sboxStats, sb.ID())
287+
}
288+
289+
// StatsForContainer returns the stats for the given container
290+
func (ss *StatsServer) StatsForContainer(c *oci.Container, sb *sandbox.Sandbox) *types.ContainerStats {
291+
ss.Lock()
292+
defer ss.Unlock()
293+
return ss.statsForContainer(c, sb)
294+
}
295+
296+
// StatsForContainers returns the stats for the given list of containers
297+
func (ss *StatsServer) StatsForContainers(ctrs []*oci.Container) []*types.ContainerStats {
298+
ss.Lock()
299+
defer ss.Unlock()
300+
stats := make([]*types.ContainerStats, 0, len(ctrs))
301+
for _, c := range ctrs {
302+
sb := ss.GetSandbox(c.Sandbox())
303+
if sb == nil {
304+
logrus.Errorf("Unexpectedly failed to get sandbox %s for container %s", c.Sandbox(), c.ID())
305+
continue
306+
}
307+
308+
if stat := ss.statsForContainer(c, sb); stat != nil {
309+
stats = append(stats, stat)
310+
}
311+
}
312+
return stats
313+
}
314+
315+
// statsForContainer is an internal, non-locking version of StatsForContainer
316+
// that returns (and occasionally gathers) the stats for the given container.
317+
func (ss *StatsServer) statsForContainer(c *oci.Container, sb *sandbox.Sandbox) *types.ContainerStats {
318+
if ss.collectionPeriod == 0 {
319+
return ss.updateContainer(c, sb)
320+
}
321+
ctrStat, ok := ss.ctrStats[c.ID()]
322+
if ok {
323+
return ctrStat
324+
}
325+
return ss.updateContainer(c, sb)
326+
}
327+
328+
// RemoveStatsForContainer removes the saved entry for the specified container
329+
// to prevent the map from always growing.
330+
func (ss *StatsServer) RemoveStatsForContainer(c *oci.Container) {
331+
ss.Lock()
332+
defer ss.Unlock()
333+
delete(ss.ctrStats, c.ID())
334+
}
335+
336+
// Shutdown tells the updateLoop to stop updating.
337+
func (ss *StatsServer) Shutdown() {
338+
close(ss.shutdown)
339+
}

0 commit comments

Comments
 (0)