Skip to content

Commit 61df085

Browse files
ashwanthgolichaudumowen-d
authored
chore(block-builder): update block builder to use kafka clients directly (grafana#15433)
Co-authored-by: Christian Haudum <[email protected]> Co-authored-by: Owen Diehl <[email protected]>
1 parent 5bec7d2 commit 61df085

File tree

2 files changed

+71
-61
lines changed

2 files changed

+71
-61
lines changed

pkg/blockbuilder/builder/builder.go

Lines changed: 70 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ import (
1515
"github.com/grafana/dskit/grpcclient"
1616
"github.com/grafana/dskit/services"
1717
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/twmb/franz-go/pkg/kgo"
19+
"golang.org/x/sync/errgroup"
1820

1921
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
2022
"github.com/grafana/loki/v3/pkg/compression"
2123
"github.com/grafana/loki/v3/pkg/kafka"
22-
"github.com/grafana/loki/v3/pkg/kafka/partition"
24+
"github.com/grafana/loki/v3/pkg/kafka/client"
2325
"github.com/grafana/loki/v3/pkg/logproto"
2426
"github.com/grafana/loki/v3/pkg/storage/chunk"
2527
"github.com/grafana/loki/v3/pkg/storage/config"
@@ -112,13 +114,14 @@ type BlockBuilder struct {
112114

113115
id string
114116
cfg Config
117+
kafkaCfg kafka.Config
115118
periodicConfigs []config.PeriodConfig
116-
metrics *builderMetrics
117-
logger log.Logger
118119

119-
decoder *kafka.Decoder
120-
readerFactory func(partition int32) (partition.Reader, error)
120+
metrics *builderMetrics
121+
logger log.Logger
122+
registerer prometheus.Registerer
121123

124+
decoder *kafka.Decoder
122125
store stores.ChunkWriter
123126
objStore *MultiStore
124127

@@ -129,34 +132,36 @@ type BlockBuilder struct {
129132
func NewBlockBuilder(
130133
id string,
131134
cfg Config,
135+
kafkaCfg kafka.Config,
132136
periodicConfigs []config.PeriodConfig,
133-
readerFactory func(partition int32) (partition.Reader, error),
134137
store stores.ChunkWriter,
135138
objStore *MultiStore,
136139
logger log.Logger,
137-
reg prometheus.Registerer,
140+
registerer prometheus.Registerer,
138141
) (*BlockBuilder,
139142
error) {
140143
decoder, err := kafka.NewDecoder()
141144
if err != nil {
142145
return nil, err
143146
}
144147

145-
t, err := types.NewGRPCTransportFromAddress(cfg.SchedulerAddress, cfg.SchedulerGRPCClientConfig, reg)
148+
t, err := types.NewGRPCTransportFromAddress(cfg.SchedulerAddress, cfg.SchedulerGRPCClientConfig, registerer)
146149
if err != nil {
147150
return nil, fmt.Errorf("create grpc transport: %w", err)
148151
}
149152

150153
i := &BlockBuilder{
151154
id: id,
152155
cfg: cfg,
156+
kafkaCfg: kafkaCfg,
153157
periodicConfigs: periodicConfigs,
154-
metrics: newBuilderMetrics(reg),
158+
metrics: newBuilderMetrics(registerer),
155159
logger: logger,
160+
registerer: registerer,
156161
decoder: decoder,
157-
readerFactory: readerFactory,
158162
store: store,
159163
objStore: objStore,
164+
inflightJobs: make(map[string]*types.Job),
160165
BuilderTransport: t,
161166
}
162167

@@ -165,20 +170,26 @@ func NewBlockBuilder(
165170
}
166171

167172
func (i *BlockBuilder) running(ctx context.Context) error {
168-
wg := sync.WaitGroup{}
169-
173+
errgrp, ctx := errgroup.WithContext(ctx)
170174
for j := 0; j < i.cfg.WorkerParallelism; j++ {
171-
wg.Add(1)
172-
go func(id string) {
173-
defer wg.Done()
175+
workerID := fmt.Sprintf("block-builder-worker-%d", j)
176+
errgrp.Go(func() error {
177+
c, err := client.NewReaderClient(
178+
i.kafkaCfg,
179+
client.NewReaderClientMetrics(workerID, i.registerer),
180+
log.With(i.logger, "component", workerID),
181+
)
182+
if err != nil {
183+
return err
184+
}
174185

175186
var waitFor time.Duration
176187
for {
177188
select {
178189
case <-ctx.Done():
179-
return
190+
return nil
180191
case <-time.After(waitFor):
181-
gotJob, err := i.runOne(ctx, id)
192+
gotJob, err := i.runOne(ctx, c, workerID)
182193
if err != nil {
183194
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
184195
}
@@ -191,30 +202,27 @@ func (i *BlockBuilder) running(ctx context.Context) error {
191202
}
192203
}
193204
}
194-
}(fmt.Sprintf("worker-%d", j))
195-
}
196205

197-
wg.Add(1)
198-
go func() {
199-
defer wg.Done()
206+
})
207+
}
200208

209+
errgrp.Go(func() error {
201210
ticker := time.NewTicker(i.cfg.SyncInterval)
202211
defer ticker.Stop()
203212

204213
for {
205214
select {
206215
case <-ctx.Done():
207-
return
216+
return nil
208217
case <-ticker.C:
209218
if err := i.syncJobs(ctx); err != nil {
210219
level.Error(i.logger).Log("msg", "failed to sync jobs", "err", err)
211220
}
212221
}
213222
}
214-
}()
223+
})
215224

216-
wg.Wait()
217-
return nil
225+
return errgrp.Wait()
218226
}
219227

220228
func (i *BlockBuilder) syncJobs(ctx context.Context) error {
@@ -233,7 +241,7 @@ func (i *BlockBuilder) syncJobs(ctx context.Context) error {
233241
return nil
234242
}
235243

236-
func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error) {
244+
func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID string) (bool, error) {
237245
// assuming GetJob blocks/polls until a job is available
238246
resp, err := i.SendGetJobRequest(ctx, &types.GetJobRequest{
239247
BuilderID: workerID,
@@ -266,7 +274,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
266274
Job: job,
267275
Success: true,
268276
}
269-
if _, err = i.processJob(ctx, job, logger); err != nil {
277+
if _, err = i.processJob(ctx, c, job, logger); err != nil {
270278
level.Error(i.logger).Log("msg", "failed to process job", "err", err)
271279
completion.Success = false
272280
}
@@ -292,7 +300,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
292300
return true, err
293301
}
294302

295-
func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
303+
func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
296304
level.Debug(logger).Log("msg", "beginning job")
297305

298306
indexer := newTsdbCreator()
@@ -316,7 +324,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
316324
"load records",
317325
1,
318326
func(ctx context.Context) error {
319-
lastOffset, err = i.loadRecords(ctx, job.Partition(), job.Offsets(), inputCh)
327+
lastOffset, err = i.loadRecords(ctx, c, job.Partition(), job.Offsets(), inputCh)
320328
return err
321329
},
322330
func(ctx context.Context) error {
@@ -502,47 +510,59 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
502510
return lastOffset, nil
503511
}
504512

505-
func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
506-
f, err := i.readerFactory(partitionID)
507-
if err != nil {
508-
return 0, err
509-
}
510-
511-
f.SetOffsetForConsumption(offsets.Min)
513+
func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
514+
// Use NoResetOffset to avoid resetting the offset to the beginning of the partition when the requested offset is out of range.
515+
// This could happen if the requested records are already outside of retention period. We should fail the job is such cases leaving the scheduler to make a decision.
516+
c.AddConsumePartitions(map[string]map[int32]kgo.Offset{
517+
i.kafkaCfg.Topic: {partitionID: kgo.NoResetOffset().At(offsets.Min)},
518+
})
519+
defer c.RemoveConsumePartitions(map[string][]int32{
520+
i.kafkaCfg.Topic: {partitionID},
521+
})
512522

513523
var (
514-
lastOffset = offsets.Min - 1
515-
boff = backoff.New(ctx, i.cfg.Backoff)
524+
lastConsumedOffset = offsets.Min - 1
525+
lastSeenOffset = offsets.Min - 1
526+
boff = backoff.New(ctx, i.cfg.Backoff)
516527
)
517528

518-
for lastOffset < offsets.Max && boff.Ongoing() {
519-
var records []partition.Record
520-
records, err = f.Poll(ctx, int(offsets.Max-lastOffset))
521-
if err != nil {
529+
for lastSeenOffset < offsets.Max && boff.Ongoing() {
530+
if err := context.Cause(ctx); err != nil {
531+
return 0, err
532+
}
533+
534+
fs := c.PollRecords(ctx, int(offsets.Max-lastConsumedOffset))
535+
// TODO: better error handling for non-retrybale errors
536+
// we don't have to iterate over all errors since we only fetch a single partition
537+
if err := fs.Err(); err != nil {
538+
level.Error(i.logger).Log("msg", "failed to poll records", "err", err)
522539
boff.Wait()
523540
continue
524541
}
525542

526-
if len(records) == 0 {
543+
if fs.Empty() {
527544
// No more records available
528545
break
529546
}
530547

531548
// Reset backoff on successful poll
532549
boff.Reset()
533550

534-
converted := make([]AppendInput, 0, len(records))
535-
for _, record := range records {
551+
converted := make([]AppendInput, 0, fs.NumRecords())
552+
for iter := fs.RecordIter(); !iter.Done(); {
553+
record := iter.Next()
554+
lastSeenOffset = record.Offset
536555
if record.Offset >= offsets.Max {
537556
level.Debug(i.logger).Log("msg", "record offset exceeds job max offset. stop processing", "record offset", record.Offset, "max offset", offsets.Max)
538557
break
539558
}
540-
lastOffset = record.Offset
541559

542-
stream, labels, err := i.decoder.Decode(record.Content)
560+
stream, labels, err := i.decoder.Decode(record.Value)
543561
if err != nil {
544562
return 0, fmt.Errorf("failed to decode record: %w", err)
545563
}
564+
565+
lastConsumedOffset = record.Offset
546566
if len(stream.Entries) == 0 {
547567
continue
548568
}
@@ -552,7 +572,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
552572
copy(entries, stream.Entries)
553573

554574
converted = append(converted, AppendInput{
555-
tenant: record.TenantID,
575+
tenant: string(record.Key),
556576
labels: labels,
557577
labelsStr: stream.Labels,
558578
entries: entries,
@@ -568,7 +588,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
568588
}
569589
}
570590

571-
return lastOffset, boff.Err()
591+
return lastConsumedOffset, boff.Err()
572592
}
573593

574594
func withBackoff[T any](

pkg/loki/modules.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1836,21 +1836,11 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
18361836
return nil, err
18371837
}
18381838

1839-
readerMetrics := partition.NewReaderMetrics(prometheus.DefaultRegisterer)
1840-
readerFactory := func(partitionID int32) (partition.Reader, error) {
1841-
return partition.NewKafkaReader(
1842-
t.Cfg.KafkaConfig,
1843-
partitionID,
1844-
logger,
1845-
readerMetrics,
1846-
)
1847-
}
1848-
18491839
bb, err := blockbuilder.NewBlockBuilder(
18501840
id,
18511841
t.Cfg.BlockBuilder,
1842+
t.Cfg.KafkaConfig,
18521843
t.Cfg.SchemaConfig.Configs,
1853-
readerFactory,
18541844
t.Store,
18551845
objectStore,
18561846
logger,

0 commit comments

Comments
 (0)