@@ -15,11 +15,13 @@ import (
1515 "github.com/grafana/dskit/grpcclient"
1616 "github.com/grafana/dskit/services"
1717 "github.com/prometheus/client_golang/prometheus"
18+ "github.com/twmb/franz-go/pkg/kgo"
19+ "golang.org/x/sync/errgroup"
1820
1921 "github.com/grafana/loki/v3/pkg/blockbuilder/types"
2022 "github.com/grafana/loki/v3/pkg/compression"
2123 "github.com/grafana/loki/v3/pkg/kafka"
22- "github.com/grafana/loki/v3/pkg/kafka/partition "
24+ "github.com/grafana/loki/v3/pkg/kafka/client "
2325 "github.com/grafana/loki/v3/pkg/logproto"
2426 "github.com/grafana/loki/v3/pkg/storage/chunk"
2527 "github.com/grafana/loki/v3/pkg/storage/config"
@@ -112,13 +114,14 @@ type BlockBuilder struct {
112114
113115 id string
114116 cfg Config
117+ kafkaCfg kafka.Config
115118 periodicConfigs []config.PeriodConfig
116- metrics * builderMetrics
117- logger log.Logger
118119
119- decoder * kafka.Decoder
120- readerFactory func (partition int32 ) (partition.Reader , error )
120+ metrics * builderMetrics
121+ logger log.Logger
122+ registerer prometheus.Registerer
121123
124+ decoder * kafka.Decoder
122125 store stores.ChunkWriter
123126 objStore * MultiStore
124127
@@ -129,34 +132,36 @@ type BlockBuilder struct {
129132func NewBlockBuilder (
130133 id string ,
131134 cfg Config ,
135+ kafkaCfg kafka.Config ,
132136 periodicConfigs []config.PeriodConfig ,
133- readerFactory func (partition int32 ) (partition.Reader , error ),
134137 store stores.ChunkWriter ,
135138 objStore * MultiStore ,
136139 logger log.Logger ,
137- reg prometheus.Registerer ,
140+ registerer prometheus.Registerer ,
138141) (* BlockBuilder ,
139142 error ) {
140143 decoder , err := kafka .NewDecoder ()
141144 if err != nil {
142145 return nil , err
143146 }
144147
145- t , err := types .NewGRPCTransportFromAddress (cfg .SchedulerAddress , cfg .SchedulerGRPCClientConfig , reg )
148+ t , err := types .NewGRPCTransportFromAddress (cfg .SchedulerAddress , cfg .SchedulerGRPCClientConfig , registerer )
146149 if err != nil {
147150 return nil , fmt .Errorf ("create grpc transport: %w" , err )
148151 }
149152
150153 i := & BlockBuilder {
151154 id : id ,
152155 cfg : cfg ,
156+ kafkaCfg : kafkaCfg ,
153157 periodicConfigs : periodicConfigs ,
154- metrics : newBuilderMetrics (reg ),
158+ metrics : newBuilderMetrics (registerer ),
155159 logger : logger ,
160+ registerer : registerer ,
156161 decoder : decoder ,
157- readerFactory : readerFactory ,
158162 store : store ,
159163 objStore : objStore ,
164+ inflightJobs : make (map [string ]* types.Job ),
160165 BuilderTransport : t ,
161166 }
162167
@@ -165,20 +170,26 @@ func NewBlockBuilder(
165170}
166171
167172func (i * BlockBuilder ) running (ctx context.Context ) error {
168- wg := sync.WaitGroup {}
169-
173+ errgrp , ctx := errgroup .WithContext (ctx )
170174 for j := 0 ; j < i .cfg .WorkerParallelism ; j ++ {
171- wg .Add (1 )
172- go func (id string ) {
173- defer wg .Done ()
175+ workerID := fmt .Sprintf ("block-builder-worker-%d" , j )
176+ errgrp .Go (func () error {
177+ c , err := client .NewReaderClient (
178+ i .kafkaCfg ,
179+ client .NewReaderClientMetrics (workerID , i .registerer ),
180+ log .With (i .logger , "component" , workerID ),
181+ )
182+ if err != nil {
183+ return err
184+ }
174185
175186 var waitFor time.Duration
176187 for {
177188 select {
178189 case <- ctx .Done ():
179- return
190+ return nil
180191 case <- time .After (waitFor ):
181- gotJob , err := i .runOne (ctx , id )
192+ gotJob , err := i .runOne (ctx , c , workerID )
182193 if err != nil {
183194 level .Error (i .logger ).Log ("msg" , "block builder run failed" , "err" , err )
184195 }
@@ -191,30 +202,27 @@ func (i *BlockBuilder) running(ctx context.Context) error {
191202 }
192203 }
193204 }
194- }(fmt .Sprintf ("worker-%d" , j ))
195- }
196205
197- wg .Add (1 )
198- go func () {
199- defer wg .Done ()
206+ })
207+ }
200208
209+ errgrp .Go (func () error {
201210 ticker := time .NewTicker (i .cfg .SyncInterval )
202211 defer ticker .Stop ()
203212
204213 for {
205214 select {
206215 case <- ctx .Done ():
207- return
216+ return nil
208217 case <- ticker .C :
209218 if err := i .syncJobs (ctx ); err != nil {
210219 level .Error (i .logger ).Log ("msg" , "failed to sync jobs" , "err" , err )
211220 }
212221 }
213222 }
214- }( )
223+ })
215224
216- wg .Wait ()
217- return nil
225+ return errgrp .Wait ()
218226}
219227
220228func (i * BlockBuilder ) syncJobs (ctx context.Context ) error {
@@ -233,7 +241,7 @@ func (i *BlockBuilder) syncJobs(ctx context.Context) error {
233241 return nil
234242}
235243
236- func (i * BlockBuilder ) runOne (ctx context.Context , workerID string ) (bool , error ) {
244+ func (i * BlockBuilder ) runOne (ctx context.Context , c * kgo. Client , workerID string ) (bool , error ) {
237245 // assuming GetJob blocks/polls until a job is available
238246 resp , err := i .SendGetJobRequest (ctx , & types.GetJobRequest {
239247 BuilderID : workerID ,
@@ -266,7 +274,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
266274 Job : job ,
267275 Success : true ,
268276 }
269- if _ , err = i .processJob (ctx , job , logger ); err != nil {
277+ if _ , err = i .processJob (ctx , c , job , logger ); err != nil {
270278 level .Error (i .logger ).Log ("msg" , "failed to process job" , "err" , err )
271279 completion .Success = false
272280 }
@@ -292,7 +300,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
292300 return true , err
293301}
294302
295- func (i * BlockBuilder ) processJob (ctx context.Context , job * types.Job , logger log.Logger ) (lastOffsetConsumed int64 , err error ) {
303+ func (i * BlockBuilder ) processJob (ctx context.Context , c * kgo. Client , job * types.Job , logger log.Logger ) (lastOffsetConsumed int64 , err error ) {
296304 level .Debug (logger ).Log ("msg" , "beginning job" )
297305
298306 indexer := newTsdbCreator ()
@@ -316,7 +324,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
316324 "load records" ,
317325 1 ,
318326 func (ctx context.Context ) error {
319- lastOffset , err = i .loadRecords (ctx , job .Partition (), job .Offsets (), inputCh )
327+ lastOffset , err = i .loadRecords (ctx , c , job .Partition (), job .Offsets (), inputCh )
320328 return err
321329 },
322330 func (ctx context.Context ) error {
@@ -502,47 +510,59 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
502510 return lastOffset , nil
503511}
504512
505- func (i * BlockBuilder ) loadRecords (ctx context.Context , partitionID int32 , offsets types.Offsets , ch chan <- []AppendInput ) (int64 , error ) {
506- f , err := i .readerFactory (partitionID )
507- if err != nil {
508- return 0 , err
509- }
510-
511- f .SetOffsetForConsumption (offsets .Min )
513+ func (i * BlockBuilder ) loadRecords (ctx context.Context , c * kgo.Client , partitionID int32 , offsets types.Offsets , ch chan <- []AppendInput ) (int64 , error ) {
514+ // Use NoResetOffset to avoid resetting the offset to the beginning of the partition when the requested offset is out of range.
515+ // This could happen if the requested records are already outside of retention period. We should fail the job is such cases leaving the scheduler to make a decision.
516+ c .AddConsumePartitions (map [string ]map [int32 ]kgo.Offset {
517+ i .kafkaCfg .Topic : {partitionID : kgo .NoResetOffset ().At (offsets .Min )},
518+ })
519+ defer c .RemoveConsumePartitions (map [string ][]int32 {
520+ i .kafkaCfg .Topic : {partitionID },
521+ })
512522
513523 var (
514- lastOffset = offsets .Min - 1
515- boff = backoff .New (ctx , i .cfg .Backoff )
524+ lastConsumedOffset = offsets .Min - 1
525+ lastSeenOffset = offsets .Min - 1
526+ boff = backoff .New (ctx , i .cfg .Backoff )
516527 )
517528
518- for lastOffset < offsets .Max && boff .Ongoing () {
519- var records []partition.Record
520- records , err = f .Poll (ctx , int (offsets .Max - lastOffset ))
521- if err != nil {
529+ for lastSeenOffset < offsets .Max && boff .Ongoing () {
530+ if err := context .Cause (ctx ); err != nil {
531+ return 0 , err
532+ }
533+
534+ fs := c .PollRecords (ctx , int (offsets .Max - lastConsumedOffset ))
535+ // TODO: better error handling for non-retrybale errors
536+ // we don't have to iterate over all errors since we only fetch a single partition
537+ if err := fs .Err (); err != nil {
538+ level .Error (i .logger ).Log ("msg" , "failed to poll records" , "err" , err )
522539 boff .Wait ()
523540 continue
524541 }
525542
526- if len ( records ) == 0 {
543+ if fs . Empty () {
527544 // No more records available
528545 break
529546 }
530547
531548 // Reset backoff on successful poll
532549 boff .Reset ()
533550
534- converted := make ([]AppendInput , 0 , len (records ))
535- for _ , record := range records {
551+ converted := make ([]AppendInput , 0 , fs .NumRecords ())
552+ for iter := fs .RecordIter (); ! iter .Done (); {
553+ record := iter .Next ()
554+ lastSeenOffset = record .Offset
536555 if record .Offset >= offsets .Max {
537556 level .Debug (i .logger ).Log ("msg" , "record offset exceeds job max offset. stop processing" , "record offset" , record .Offset , "max offset" , offsets .Max )
538557 break
539558 }
540- lastOffset = record .Offset
541559
542- stream , labels , err := i .decoder .Decode (record .Content )
560+ stream , labels , err := i .decoder .Decode (record .Value )
543561 if err != nil {
544562 return 0 , fmt .Errorf ("failed to decode record: %w" , err )
545563 }
564+
565+ lastConsumedOffset = record .Offset
546566 if len (stream .Entries ) == 0 {
547567 continue
548568 }
@@ -552,7 +572,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
552572 copy (entries , stream .Entries )
553573
554574 converted = append (converted , AppendInput {
555- tenant : record .TenantID ,
575+ tenant : string ( record .Key ) ,
556576 labels : labels ,
557577 labelsStr : stream .Labels ,
558578 entries : entries ,
@@ -568,7 +588,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
568588 }
569589 }
570590
571- return lastOffset , boff .Err ()
591+ return lastConsumedOffset , boff .Err ()
572592}
573593
574594func withBackoff [T any ](
0 commit comments