The dead-simple batching solution for golang applications.
With batcher you can easily batch operations and run them asynchronously in batches:
package main
import (
"fmt"
"time"
"github.com/NSXBet/batcher/pkg/batcher"
)
type BatchItem struct {
ID int
Name string
}
func main() {
// create a batcher
batcher := batcher.New[*BatchItem](
batcher.WithBatchSize[*BatchItem](100), // will batch each 100 items.
batcher.WithBatchInterval[*BatchItem](1*time.Second), // or each second.
// then run this processor with each batch
batcher.WithProcessor(func(items []*BatchItem) error {
fmt.Printf("processing batch with %d items...\n", len(items))
// do your thing :)
return nil
}),
)
// stop the batcher
defer batcher.Close()
// add operations to the batcher
for i := 0; i < 1000; i++ {
batcher.Add(&BatchItem{
ID: i,
Name: fmt.Sprintf("item-%d", i),
})
}
// wait for all batches to process...
timeout := 10 * time.Second
if err := batcher.Join(timeout); err != nil {
fmt.Printf("timeout error: %v\n", err)
return
}
// You should see something like (10 times):
// processing batch with 100 items...
// processing batch with 100 items...
// processing batch with 100 items...
// processing batch with 100 items...
// ...
}Just go get -u github.com/NSXBet/batcher and you're ready to go!
To create a batcher you can use the New function:
batcher := batcher.New[*BatchItem](
batcher.WithBatchSize[*BatchItem](100), // will batch each 100 items.
batcher.WithBatchInterval[*BatchItem](1*time.Second), // or each second.
// then run this processor with each batch
batcher.WithProcessor(func(items []*BatchItem) error {
fmt.Printf("processing batch with %d items...\n", len(items))
// do your thing :)
return nil
}),
)You can pass a processor in the form of a function of signature func(items []*BatchItem) error to the batcher:
batcher := batcher.New[*BatchItem](
batcher.WithProcessor(func(items []*BatchItem) error {
return nil
}),
)This function will be called with the batch of items to process.
You can also use a struct in order to have access to any dependencies you require:
// 1. Create a Processor struct with all the dependencies you need.
type Processor struct {
logger *zap.Logger
}
func NewProcessor() (*Processor, error) {
logger, err := zap.NewDevelopment() // or whatever dependency you need
if err != nil {
return nil, err
}
return &Processor{
logger: logger,
}, nil
}
// 2. Implement the Processor interface function.
// Here you get to use any dependencies you injected into the processor.
func (p *Processor) Process(items []BatchItem) error {
p.logger.Info("processing items", zap.Int("count", len(items)))
return nil
}
// 3. Later when you are creating the batcher, pass the processor.Process function
// to the WithProcessor option to wire batcher with your processor struct.
processor, err := NewProcessor()
if err != nil {
log.Fatalf("error creating processor: %v", err)
}
batcher := batcher.New[*BatchItem](
batcher.WithProcessor(processor.Process),
)To add items to the batcher you can use the Add function:
for i := 0; i < 1000; i++ {
batcher.Add(&BatchItem{
ID: i,
Name: fmt.Sprintf("item-%d", i),
})
}To wait for all batches to process you can use the Join function:
timeout := 10 * time.Second
if err := batcher.Join(timeout); err != nil {
fmt.Printf("timeout error: %v\n", err)
}To stop the batcher you can use the StopProcessing function:
defer batcher.Close()
// batcher.IsClosed() == true after this pointThis function is safe to be called multiple times as it will only stop the processor once.
Whenever the processor function returns an error, the batcher will send the error in the Errors() channel:
for err := range batcher.Errors() {
fmt.Printf("error processing batch: %v\n", err)
}You can get how many items are in the batcher by using the Len function:
fmt.Printf("batcher has %d items\n", batcher.Len())WithBatchSize[*BatchItem](size int): sets the batch size.WithBatchInterval[*BatchItem](interval time.Duration): sets the batch interval.WithProcessor(func(items []*BatchItem) error): sets the processor function.
The batcher can be easily integrated with uber-go/fx for dependency injection and lifecycle management. Here's how to use it:
package main
import (
"fmt"
"time"
"go.uber.org/fx"
"go.uber.org/zap"
"github.com/NSXBet/batcher/pkg/batcher"
)
type BatchItem struct {
ID int
Name string
}
// RequestHandler handles incoming requests and enqueues them to the batcher
type RequestHandler struct {
batcher *batcher.Batcher[*BatchItem]
}
// NewRequestHandler creates a new request handler with batcher dependency
func NewRequestHandler(b *batcher.Batcher[*BatchItem]) *RequestHandler {
return &RequestHandler{
batcher: b,
}
}
// HandleRequest processes a single request by enqueueing it to the batcher
func (h *RequestHandler) HandleRequest(id int, name string) error {
h.batcher.Add(&BatchItem{
ID: id,
Name: name,
})
return nil
}
// Processor handles the actual batch processing
type Processor struct {
logger *zap.Logger
}
// NewProcessor creates a new processor with its dependencies
func NewProcessor(logger *zap.Logger) *Processor {
return &Processor{
logger: logger,
}
}
// Process implements the batch processing logic
func (p *Processor) Process(items []*BatchItem) error {
p.logger.Info("processing items", zap.Int("count", len(items)))
return nil
}
func main() {
app := fx.New(
// Provide the processor
fx.Provide(NewProcessor),
// Add the batcher module with a processor function that resolves dependencies
batcher.ProvideBatcherInFX[*BatchItem](
// This function is an FX resolver that will be called with the processor dependency
func(processor *Processor) batcher.Processor[*BatchItem] {
// Return the processor's Process method as the batch processor
return processor.Process
},
2, // batch size
time.Millisecond*100, // batch interval
),
// Provide the request handler
fx.Provide(NewRequestHandler),
)
// Start the app
app.Run()
}The key part of the FX integration is the processor function passed to ProvideBatcherInFX. This function is an FX resolver that:
- Takes any dependencies you need (like the
Processorstruct) as parameters - Returns a
batcher.Processor[T]function that will be used to process batches - Can use any of the injected dependencies to implement the processing logic
This allows you to:
- Have access to all your dependencies in the processor function
- Keep your processor logic in a separate struct with its own dependencies
- Let FX handle the dependency injection and lifecycle management
The batcher will be automatically started when the FX app starts and stopped when the app stops.
Just run make unit to run all tests.
We strive to have 100% test coverage, but for now we're close to 95%. It will do for now.
Just run make bench to run all benchmarks.
For the most up-to-date benchmarks in this repository, you can access this page. These results are run every time someone merges a PR into the main branch.
Our benchmarks are divided by batch size and should look like this (actual results depend on your machine):
Running benchmarks...
2024-06-07T00:21:10.619-0300 INFO test/helpers.go:30 processing items {"count": 1000}
goos: linux
goarch: amd64
pkg: github.com/NSXBet/batcher/pkg/batcher
cpu: Intel(R) Core(TM) i9-14900KF
BenchmarkBatcherBatchSize10-24 4588717 255.4 ns/op
BenchmarkBatcherBatchSize100-24 5017683 254.8 ns/op
BenchmarkBatcherBatchSize1_000-24 4721426 235.4 ns/op
BenchmarkBatcherBatchSize10_000-24 4603827 245.5 ns/op
BenchmarkBatcherBatchSize100_000-24 4848703 244.8 ns/op
PASS
ok github.com/NSXBet/batcher/pkg/batcher 26.988sThese benchmarks take into account the time it takes to add items to the batcher, not the time to process the batches as that will vary depending on the processor function you pass to the batcher.
MIT.
Feel free to open issues and send PRs.