Skip to content

Commit fa415b0

Browse files
authored
Create base error type for ingester per-instance errors and remove logging for them (grafana#5585)
This allows us to decorate them with extra information for gRPC responses and our logging middleware (to prevent them from being logged which is expensive). Related grafana#5581 Related weaveworks/common#299
1 parent 3a622c5 commit fa415b0

File tree

6 files changed

+107
-23
lines changed

6 files changed

+107
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* [CHANGE] Querier: Renamed `-querier.prefer-streaming-chunks` to `-querier.prefer-streaming-chunks-from-ingesters` to enable streaming chunks from ingesters to queriers. #5182
1111
* [CHANGE] Querier: `-query-frontend.cache-unaligned-requests` has been moved from a global flag to a per-tenant override. #5312
1212
* [CHANGE] Ingester: removed `cortex_ingester_shipper_dir_syncs_total` and `cortex_ingester_shipper_dir_sync_failures_total` metrics. The former metric was not much useful, and the latter was never incremented. #5396
13+
* [CHANGE] Ingester: Do not log errors related to hitting per-instance limits to reduce resource usage when ingesters are under pressure. #5585
1314
* [CHANGE] gRPC clients: use default connect timeout of 5s, and therefore enable default connect backoff max delay of 5s. #5562
1415
* [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136
1516
* [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ require (
8484
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 // indirect
8585
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
8686
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
87-
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
8887
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
8988
github.com/go-test/deep v1.1.0 // indirect
9089
github.com/hashicorp/go-retryablehttp v0.7.4 // indirect
@@ -100,6 +99,7 @@ require (
10099
cloud.google.com/go/compute/metadata v0.2.3 // indirect
101100
cloud.google.com/go/iam v1.1.1 // indirect
102101
github.com/DmitriyVTitov/size v1.5.0 // indirect
102+
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
103103
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
104104
github.com/armon/go-metrics v0.4.1 // indirect
105105
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect

pkg/ingester/ingester.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,11 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
760760

761761
db, err := i.getOrCreateTSDB(userID, false)
762762
if err != nil {
763+
// Check for a particular per-instance limit and return that error directly
764+
// since it contains extra information for gRPC and our logging middleware.
765+
if errors.Is(err, errMaxTenantsReached) {
766+
return nil, err
767+
}
763768
return nil, wrapWithUser(err, userID)
764769
}
765770

@@ -805,7 +810,12 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, pushReq *push.Request) (
805810
level.Warn(i.logger).Log("msg", "failed to rollback appender on error", "user", userID, "err", err)
806811
}
807812

808-
return nil, err
813+
// Check for a particular per-instance limit and return that error directly
814+
// since it contains extra information for gRPC and our logging middleware.
815+
if errors.Is(err, errMaxInMemorySeriesReached) {
816+
return nil, err
817+
}
818+
return nil, wrapWithUser(err, userID)
809819
}
810820

811821
// At this point all samples have been added to the appender, so we can track the time it took.
@@ -1038,7 +1048,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
10381048
}
10391049

10401050
// Otherwise, return a 500.
1041-
return wrapWithUser(err, userID)
1051+
return err
10421052
}
10431053

10441054
numNativeHistogramBuckets := -1
@@ -1079,7 +1089,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
10791089
continue
10801090
}
10811091

1082-
return wrapWithUser(err, userID)
1092+
return err
10831093
}
10841094
numNativeHistograms := len(ts.Histograms)
10851095
if numNativeHistograms > 0 {

pkg/ingester/ingester_test.go

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ import (
5353
"golang.org/x/exp/slices"
5454
"golang.org/x/sync/errgroup"
5555
"google.golang.org/grpc"
56+
"google.golang.org/grpc/codes"
57+
"google.golang.org/grpc/status"
5658

5759
"github.com/grafana/mimir/pkg/ingester/activeseries"
5860
"github.com/grafana/mimir/pkg/ingester/client"
@@ -63,7 +65,6 @@ import (
6365
"github.com/grafana/mimir/pkg/storage/tsdb/block"
6466
"github.com/grafana/mimir/pkg/usagestats"
6567
"github.com/grafana/mimir/pkg/util"
66-
util_log "github.com/grafana/mimir/pkg/util/log"
6768
util_math "github.com/grafana/mimir/pkg/util/math"
6869
"github.com/grafana/mimir/pkg/util/push"
6970
util_test "github.com/grafana/mimir/pkg/util/test"
@@ -5415,10 +5416,9 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) {
54155416

54165417
func TestIngester_PushInstanceLimits(t *testing.T) {
54175418
tests := map[string]struct {
5418-
limits InstanceLimits
5419-
reqs map[string][]*mimirpb.WriteRequest
5420-
expectedErr error
5421-
expectedErrType interface{}
5419+
limits InstanceLimits
5420+
reqs map[string][]*mimirpb.WriteRequest
5421+
expectedErr error
54225422
}{
54235423
"should succeed creating one user and series": {
54245424
limits: InstanceLimits{MaxInMemorySeries: 1, MaxInMemoryTenants: 1},
@@ -5461,7 +5461,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
54615461
},
54625462
},
54635463

5464-
expectedErr: wrapWithUser(errMaxInMemorySeriesReached, "test"),
5464+
expectedErr: errMaxInMemorySeriesReached,
54655465
},
54665466

54675467
"should fail creating two users": {
@@ -5488,7 +5488,7 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
54885488
),
54895489
},
54905490
},
5491-
expectedErr: wrapWithUser(errMaxTenantsReached, "user2"),
5491+
expectedErr: errMaxTenantsReached,
54925492
},
54935493

54945494
"should fail pushing samples in two requests due to rate limit": {
@@ -5557,9 +5557,12 @@ func TestIngester_PushInstanceLimits(t *testing.T) {
55575557
} else {
55585558
// Last push may expect error.
55595559
if testData.expectedErr != nil {
5560-
assert.Equal(t, testData.expectedErr, err)
5561-
} else if testData.expectedErrType != nil {
5562-
assert.True(t, errors.As(err, testData.expectedErrType), "expected error type %T, got %v", testData.expectedErrType, err)
5560+
assert.ErrorIs(t, err, testData.expectedErr)
5561+
var optional middleware.OptionalLogging
5562+
assert.ErrorAs(t, err, &optional)
5563+
s, ok := status.FromError(err)
5564+
require.True(t, ok, "expected to be able to convert to gRPC status")
5565+
assert.Equal(t, codes.Unavailable, s.Code())
55635566
} else {
55645567
assert.NoError(t, err)
55655568
}
@@ -5687,10 +5690,17 @@ func TestIngester_inflightPushRequests(t *testing.T) {
56875690

56885691
time.Sleep(10 * time.Millisecond) // Give first goroutine a chance to start pushing...
56895692
req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1, 1024)
5693+
var optional middleware.OptionalLogging
56905694

56915695
_, err := i.Push(ctx, req)
5692-
require.Equal(t, errMaxInflightRequestsReached, err)
5693-
require.ErrorAs(t, err, &util_log.DoNotLogError{})
5696+
require.ErrorIs(t, err, errMaxInflightRequestsReached)
5697+
require.ErrorAs(t, err, &optional)
5698+
require.False(t, optional.ShouldLog(ctx, time.Duration(0)), "expected not to log via .ShouldLog()")
5699+
5700+
s, ok := status.FromError(err)
5701+
require.True(t, ok, "expected to be able to convert to gRPC status")
5702+
require.Equal(t, codes.Unavailable, s.Code())
5703+
56945704
return nil
56955705
})
56965706

pkg/ingester/instance_limits.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
package ingester
77

88
import (
9+
"context"
910
"flag"
11+
"time"
1012

11-
"github.com/pkg/errors"
13+
"google.golang.org/grpc/codes"
14+
"google.golang.org/grpc/status"
1215
"gopkg.in/yaml.v3"
1316

1417
"github.com/grafana/mimir/pkg/util/globalerror"
15-
util_log "github.com/grafana/mimir/pkg/util/log"
1618
)
1719

1820
const (
@@ -22,14 +24,41 @@ const (
2224
maxInflightPushRequestsFlag = "ingester.instance-limits.max-inflight-push-requests"
2325
)
2426

27+
// We don't include values in the messages for per-instance limits to avoid leaking Mimir cluster configuration to users.
2528
var (
26-
// We don't include values in the message to avoid leaking Mimir cluster configuration to users.
27-
errMaxIngestionRateReached = errors.New(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag))
28-
errMaxTenantsReached = errors.New(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag))
29-
errMaxInMemorySeriesReached = errors.New(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag))
30-
errMaxInflightRequestsReached = util_log.DoNotLogError{Err: errors.New(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))}
29+
errMaxIngestionRateReached = newInstanceLimitError(globalerror.IngesterMaxIngestionRate.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the samples ingestion rate limit", maxIngestionRateFlag))
30+
errMaxTenantsReached = newInstanceLimitError(globalerror.IngesterMaxTenants.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of tenants", maxInMemoryTenantsFlag))
31+
errMaxInMemorySeriesReached = newInstanceLimitError(globalerror.IngesterMaxInMemorySeries.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of in-memory series", maxInMemorySeriesFlag))
32+
errMaxInflightRequestsReached = newInstanceLimitError(globalerror.IngesterMaxInflightPushRequests.MessageWithPerInstanceLimitConfig("the write request has been rejected because the ingester exceeded the allowed number of inflight push requests", maxInflightPushRequestsFlag))
3133
)
3234

35+
type instanceLimitErr struct {
36+
msg string
37+
status *status.Status
38+
}
39+
40+
func newInstanceLimitError(msg string) error {
41+
return &instanceLimitErr{
42+
// Errors from hitting per-instance limits are always "unavailable" for gRPC
43+
status: status.New(codes.Unavailable, msg),
44+
msg: msg,
45+
}
46+
}
47+
48+
func (e *instanceLimitErr) ShouldLog(context.Context, time.Duration) bool {
49+
// We increment metrics when hitting per-instance limits and so there's no need to
50+
// log them, the error doesn't contain any interesting information for us.
51+
return false
52+
}
53+
54+
func (e *instanceLimitErr) GRPCStatus() *status.Status {
55+
return e.status
56+
}
57+
58+
func (e *instanceLimitErr) Error() string {
59+
return e.msg
60+
}
61+
3362
// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return
3463
// (internal) error.
3564
type InstanceLimits struct {

pkg/ingester/instance_limits_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,16 @@
66
package ingester
77

88
import (
9+
"context"
10+
"fmt"
911
"strings"
1012
"testing"
13+
"time"
1114

1215
"github.com/stretchr/testify/require"
16+
"github.com/weaveworks/common/middleware"
17+
"google.golang.org/grpc/codes"
18+
"google.golang.org/grpc/status"
1319
"gopkg.in/yaml.v3"
1420
)
1521

@@ -34,3 +40,31 @@ max_tenants: 50000
3440
require.Equal(t, int64(30), l.MaxInMemorySeries) // default value
3541
require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value
3642
}
43+
44+
func TestInstanceLimitErr(t *testing.T) {
45+
t.Run("bare error implements ShouldLog()", func(t *testing.T) {
46+
var optional middleware.OptionalLogging
47+
require.ErrorAs(t, errMaxInflightRequestsReached, &optional)
48+
require.False(t, optional.ShouldLog(context.Background(), time.Duration(0)))
49+
})
50+
51+
t.Run("wrapped error implements ShouldLog()", func(t *testing.T) {
52+
err := fmt.Errorf("%w: oh no", errMaxTenantsReached)
53+
var optional middleware.OptionalLogging
54+
require.ErrorAs(t, err, &optional)
55+
require.False(t, optional.ShouldLog(context.Background(), time.Duration(0)))
56+
})
57+
58+
t.Run("bare error implements GRPCStatus()", func(t *testing.T) {
59+
s, ok := status.FromError(errMaxInMemorySeriesReached)
60+
require.True(t, ok, "expected to be able to convert to gRPC status")
61+
require.Equal(t, codes.Unavailable, s.Code())
62+
})
63+
64+
t.Run("wrapped error implements GRPCStatus()", func(t *testing.T) {
65+
err := fmt.Errorf("%w: oh no", errMaxIngestionRateReached)
66+
s, ok := status.FromError(err)
67+
require.True(t, ok, "expected to be able to convert to gRPC status")
68+
require.Equal(t, codes.Unavailable, s.Code())
69+
})
70+
}

0 commit comments

Comments
 (0)