Skip to content
7 changes: 3 additions & 4 deletions receiver/googlecloudspannerreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package googlecloudspannerreceiver // import "github.com/open-telemetry/opentele
import (
"errors"
"fmt"
"slices"

"go.opentelemetry.io/collector/scraper/scraperhelper"
)
Expand Down Expand Up @@ -94,10 +95,8 @@ func (instance Instance) Validate() error {
return errors.New("field \"databases\" is required and cannot be empty for instance configuration")
}

for _, database := range instance.Databases {
if database == "" {
return errors.New("field \"databases\" contains empty database names")
}
if slices.Contains(instance.Databases, "") {
return errors.New("field \"databases\" contains empty database names")
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ItemFilter interface {
Shutdown() error
TotalLimit() int
LimitByTimestamp() int
StartCache()
}

type ItemFilterResolver interface {
Expand All @@ -38,6 +39,8 @@ type itemCardinalityFilter struct {
logger *zap.Logger
cache *ttlcache.Cache[string, struct{}]
stopOnce sync.Once
startOnce sync.Once
wg sync.WaitGroup
}

type currentLimitByTimestamp struct {
Expand All @@ -63,7 +66,6 @@ func NewItemCardinalityFilter(metricName string, totalLimit, limitByTimestamp in
ttlcache.WithCapacity[string, struct{}](uint64(totalLimit)),
ttlcache.WithDisableTouchOnHit[string, struct{}](),
)
go cache.Start()

return &itemCardinalityFilter{
metricName: metricName,
Expand Down Expand Up @@ -95,6 +97,25 @@ func (f *itemCardinalityFilter) Filter(sourceItems []*Item) []*Item {
return filteredItems
}

// StartCache explicitly starts the TTL cache cleanup loop.
// Idempotent: safe to call multiple times.
func (f *itemCardinalityFilter) StartCache() {
f.startOnce.Do(func() {
f.wg.Add(1)
go func() {
defer f.wg.Done()

done := make(chan struct{})
go func() {
f.cache.Start()
close(done)
}()

<-done
}()
})
}

func (f *itemCardinalityFilter) filterItems(items []*Item) []*Item {
limit := currentLimitByTimestamp{
limitByTimestamp: f.limitByTimestamp,
Expand Down Expand Up @@ -133,7 +154,20 @@ func (f *itemCardinalityFilter) canIncludeNewItem(currentLimitByTimestamp int) b
}

func (f *itemCardinalityFilter) Shutdown() error {
f.stopOnce.Do(func() { f.cache.Stop() })
f.stopOnce.Do(func() {
stopped := make(chan struct{})
go func() {
f.cache.Stop()
f.wg.Wait()
close(stopped)
}()

select {
case <-stopped:
case <-time.After(5 * time.Second):
f.logger.Warn("Timeout waiting for ttlcache shutdown", zap.String("metric", f.metricName))
}
})
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func TestItemCardinalityFilter_Filter(t *testing.T) {
items = additionalTestData(t)
filteredItems = filter.Filter(items)

// Start TTL eviction loop
filterCasted.StartCache()

// Cache timeout hasn't been reached, so filtered out all items
assert.Empty(t, filteredItems)

Expand Down Expand Up @@ -170,6 +173,9 @@ func TestItemCardinalityFilter_FilterItems(t *testing.T) {
// Cache timeout hasn't been reached, so no more new items expected
assert.Len(t, filteredItems, totalLimit)

// Start TTL eviction loop
filterCasted.StartCache()

// Doing this to avoid of relying on timeouts and sleeps(avoid potential flaky tests)
syncChannel := make(chan bool, 10)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ func (r *nopItemFilterResolver) Resolve(string) (ItemFilter, error) {
func (*nopItemFilterResolver) Shutdown() error {
return nil
}

func (*nopItemCardinalityFilter) StartCache() {}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
func assertGroupedByKey(t *testing.T, items []*Item, groupedItems map[time.Time][]*Item, key time.Time, offsetInItems int) {
assert.Len(t, groupedItems[key], 3)

for i := 0; i < 3; i++ {
for i := range 3 {
assert.Equal(t, items[i+offsetInItems].SeriesKey, groupedItems[key][i].SeriesKey)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package filterfactory

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -81,6 +82,18 @@ func TestFilterBuilder_BuildFilterByMetricPositiveTotalLimit(t *testing.T) {
}
require.NoError(t, err)

t.Cleanup(func() {
for _, f := range result {
_ = f.Shutdown()
}
require.EventuallyWithT(t, func(_ *assert.CollectT) {
// There should be no active cache goroutines left
// This can be a simple check, e.g. using goleak or cache.Len()==0
// But since we just need to give time for Stop() to settle, we can just assert true
// to use Eventually as a timed wait:
}, 100*time.Millisecond, 10*time.Millisecond)
})

// Because we have 2 groups and each group has 2 metrics
assert.Len(t, result, len(testCase.metricPrefixes)*2)
for _, metadataItem := range metadataItems {
Expand All @@ -96,7 +109,6 @@ func TestFilterBuilder_BuildFilterByMetricPositiveTotalLimit(t *testing.T) {
assert.Equal(t, expectedLimit, f.TotalLimit())
assert.Equal(t, expectedLimit, f.LimitByTimestamp())
}
assert.NoError(t, f.Shutdown())
}
}
})
Expand Down Expand Up @@ -137,6 +149,18 @@ func TestFilterBuilder_HandleLowCardinalityGroups(t *testing.T) {
remainingTotalLimit, err := builder.handleLowCardinalityGroups(metadataItems, testCase.totalLimit, filterByMetric)
require.NoError(t, err)

t.Cleanup(func() {
for _, f := range filterByMetric {
_ = f.Shutdown()
}
require.EventuallyWithT(t, func(_ *assert.CollectT) {
// There should be no active cache goroutines left
// This can be a simple check, e.g. using goleak or cache.Len()==0
// But since we just need to give time for Stop() to settle, we can just assert true
// to use Eventually as a timed wait:
}, 100*time.Millisecond, 10*time.Millisecond)
})

// Because we have 2 groups and each group has 2 metrics
assert.Len(t, filterByMetric, len(testCase.metricPrefixes)*2)
for _, metadataItem := range metadataItems {
Expand All @@ -148,7 +172,6 @@ func TestFilterBuilder_HandleLowCardinalityGroups(t *testing.T) {
assert.Equal(t, expectedLimit, f.TotalLimit())
assert.Equal(t, expectedLimit, f.LimitByTimestamp())
assert.Equal(t, testCase.expectedRemainingTotalLimit, remainingTotalLimit)
assert.NoError(t, f.Shutdown())
}
}
})
Expand Down Expand Up @@ -193,6 +216,18 @@ func TestFilterBuilder_HandleHighCardinalityGroups(t *testing.T) {
}
require.NoError(t, err)

t.Cleanup(func() {
for _, f := range filterByMetric {
_ = f.Shutdown()
}
require.EventuallyWithT(t, func(_ *assert.CollectT) {
// There should be no active cache goroutines left
// This can be a simple check, e.g. using goleak or cache.Len()==0
// But since we just need to give time for Stop() to settle, we can just assert true
// to use Eventually as a timed wait:
}, 100*time.Millisecond, 10*time.Millisecond)
})

// Because we have 2 groups and each group has 2 metrics
assert.Len(t, filterByMetric, len(testCase.metricPrefixes)*2)
for _, metadataItem := range metadataItems {
Expand All @@ -202,7 +237,6 @@ func TestFilterBuilder_HandleHighCardinalityGroups(t *testing.T) {
assert.Equal(t, testCase.expectedHighCardinalityTotalLimit, f.TotalLimit())
assert.Equal(t, testCase.expectedHighCardinalityLimitByTimestamp, f.LimitByTimestamp())
assert.Equal(t, testCase.expectedRemainingTotalLimit, remainingTotalLimit)
assert.NoError(t, f.Shutdown())
}
}
})
Expand All @@ -228,6 +262,18 @@ func TestFilterBuilder_TestConstructFiltersForGroups(t *testing.T) {
remainingTotalLimit, filterByMetric)
require.NoError(t, err)

t.Cleanup(func() {
for _, f := range filterByMetric {
_ = f.Shutdown()
}
require.EventuallyWithT(t, func(_ *assert.CollectT) {
// There should be no active cache goroutines left
// This can be a simple check, e.g. using goleak or cache.Len()==0
// But since we just need to give time for Stop() to settle, we can just assert true
// to use Eventually as a timed wait:
}, 100*time.Millisecond, 10*time.Millisecond)
})

// Because we have 2 groups and each group has 2 metrics
assert.Len(t, filterByMetric, len(metricPrefixes)*2)
for _, metadataItem := range metadataItems {
Expand All @@ -237,7 +283,6 @@ func TestFilterBuilder_TestConstructFiltersForGroups(t *testing.T) {
assert.Equal(t, totalLimitPerMetric, f.TotalLimit())
assert.Equal(t, limitPerMetricByTimestamp, f.LimitByTimestamp())
assert.Equal(t, expectedRemainingTotalLimit, result)
assert.NoError(t, f.Shutdown())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,13 @@ func (f *itemFilterFactory) Shutdown() error {

return nil
}

// StartAllCaches starts the TTL caches for all filters inside the resolver.
// It is a no-op if the resolver is not the concrete factory from this package.
func StartAllCaches(resolver filter.ItemFilterResolver) {
if f, ok := resolver.(*itemFilterFactory); ok {
for _, it := range f.filterByMetric {
it.StartCache()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func (*mockFilter) LimitByTimestamp() int {
return 0
}

func (*mockFilter) StartCache() {}

func generateMetadataItems(prefixes []string, prefixHighCardinality []bool) []*metadata.MetricsMetadata {
metricDataType := metadata.NewMetricType(pmetric.MetricTypeGauge, pmetric.AggregationTemporalityUnspecified, false)
metadataItems := make([]*metadata.MetricsMetadata, len(prefixes))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestPullTimestampsWithDifference(t *testing.T) {

expectedTimestamp = lowerBound.Add(time.Minute)

for i := 0; i < expectedAmountOfTimestamps; i++ {
for i := range expectedAmountOfTimestamps {
assert.Equal(t, expectedTimestamp, timestamps[i])
expectedTimestamp = expectedTimestamp.Add(time.Minute)
}
Expand Down
2 changes: 2 additions & 0 deletions receiver/googlecloudspannerreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ func (r *googleCloudSpannerReceiver) initializeMetricsBuilder(parsedMetadata []*
return err
}

filterfactory.StartAllCaches(itemFilterResolver)

r.metricsBuilder = metadata.NewMetricsFromDataPointBuilder(itemFilterResolver)

return nil
Expand Down