Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 187 additions & 39 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"runtime"
"sort"
Expand Down Expand Up @@ -91,6 +92,7 @@ type DB struct {

// MaxBatchSize is the maximum size of a batch. Default value is
// copied from DefaultMaxBatchSize in Open.
// When MaxBatchSize exceeds, BatchProcessor writes full batches without delays.
//
// If <=0, disables batching.
//
Expand All @@ -99,12 +101,22 @@ type DB struct {

// MaxBatchDelay is the maximum delay before a batch starts.
// Default value is copied from DefaultMaxBatchDelay in Open.
// BatchProcessor waits this amount of time to accumulate and write batches.
// The timer resets when a batch has been written.
//
// If <=0, effectively disables batching.
// If <=0, disables delayed batching.
//
// Do not change concurrently with calls to Batch.
MaxBatchDelay time.Duration

// MaxBatchQueue limits the amount of batches queued to BatchProcessor.
// Default value is copied from DefaultMaxQueueSize in Open.
// Higher values will eat more memory as more batches will queue up on slow storage.
//
// If == 1, means 1 open batch and 1 in execution which is mostly enough. (default)
// If == 0, works like a flip-flop without queue.
// Do not change concurrently with calls to Batch.
MaxBatchQueue int

// AllocSize is the amount of space allocated when the database
// needs to create new pages. This is done to amortize the cost
// of truncate() and fsync() when growing the data file.
Expand Down Expand Up @@ -139,11 +151,14 @@ type DB struct {

batchMu sync.Mutex
batch *batch
batChan chan *batch
bPruns chan struct{} // has 1 if batchProcessor runs

rwlock sync.Mutex // Allows only one writer at a time.
metalock sync.Mutex // Protects meta page access.
mmaplock sync.RWMutex // Protects mmap access during remapping.
statlock sync.RWMutex // Protects stats access.
bpmux sync.Mutex // locks batchProcessor

ops struct {
writeAt func(b []byte, off int64) (n int, err error)
Expand Down Expand Up @@ -190,9 +205,10 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
db.Mlock = options.Mlock

// Set default values for later DB operations.
db.MaxBatchSize = common.DefaultMaxBatchSize
db.MaxBatchDelay = common.DefaultMaxBatchDelay
db.AllocSize = common.DefaultAllocSize
db.AllocSize = options.AllocSize
db.MaxBatchDelay = options.MaxBatchDelay
db.MaxBatchQueue = options.MaxBatchQueue
db.MaxBatchSize = options.MaxBatchSize

flag := os.O_RDWR
if options.ReadOnly {
Expand Down Expand Up @@ -292,6 +308,10 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
}
}

db.bpmux.Lock()
defer db.bpmux.Unlock()
go db.BatchProcessor()

// Mark the database as opened and return.
return db, nil
}
Expand Down Expand Up @@ -631,6 +651,25 @@ func (db *DB) init() error {
// It will block waiting for any open transactions to finish
// before closing the database and returning.
func (db *DB) Close() error {

if db.batChan != nil {
// passing a nil pointer down the chan quits batchProcessor
// never close `db.batChan` because there could be senders
// a panic (here) will crash the DB!
db.batChan <- nil
}

for {
time.Sleep(10 * time.Millisecond)
if len(db.bPruns) == 0 {
break
}
log.Printf("bbolt Close wait batchProcessor queued=%d", len(db.batChan))
}

db.bpmux.Lock()
defer db.bpmux.Unlock()

db.rwlock.Lock()
defer db.rwlock.Unlock()

Expand Down Expand Up @@ -932,18 +971,26 @@ func (db *DB) Batch(fn func(*Tx) error) error {
errCh := make(chan error, 1)

db.batchMu.Lock()
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
// There is no existing batch, or the existing batch is full; start a new one.
db.batch = &batch{
db: db,
}
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
// check if existing batch exceeds size
if db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize {
// get the pointer
batptr := db.batch
// reset batch
db.batch = nil
// unlocks before sending to channel!
db.batchMu.Unlock()
// pass batched calls to BatchProcessor
db.batChan <- batptr
//log.Printf("Batch passed batches=%d to batChan db.MaxBatchSize=%d", len(batches), db.MaxBatchSize)
// locks again to create new batch and append
db.batchMu.Lock()
}
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
if len(db.batch.calls) >= db.MaxBatchSize {
// wake up batch, it's ready to run
go db.batch.trigger()

if db.batch == nil {
db.batch = &batch{}
}

db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
db.batchMu.Unlock()

err := <-errCh
Expand All @@ -953,39 +1000,129 @@ func (db *DB) Batch(fn func(*Tx) error) error {
return err
}

func (db *DB) BatchProcessor() {
// we keep this lock open until break forever to quit
db.bpmux.Lock()
defer db.bpmux.Unlock()

maxBatchQueue := db.MaxBatchQueue
if db.batChan == nil {
db.batChan = make(chan *batch, maxBatchQueue)
db.bPruns = make(chan struct{}, 1)
}

select {
case db.bPruns <- struct{}{}:
// notify BatchProcessor runs
// pass
default:
log.Printf("ERROR db.bPruns locked")
return
}

// define loop vars
var batptr *batch
var batchcap = cap(db.batChan)
var maxbatch = db.MaxBatchSize
var maxdelay = db.MaxBatchDelay
var maxqueue = db.MaxBatchQueue
var timer *time.Timer
if maxdelay > 0 {
timer = time.NewTimer(maxdelay)
}
var timeout, ok bool
log.Printf("BatchProcessor maxqueue=%d batchcap=%d maxbatch=%d maxdelay=%d", maxqueue, batchcap, maxbatch, maxdelay)

//now := time.Now().UnixNano()
//lastrun := now
forever:
for {

// in this forever loop: don't use return!
// only `break forever` so it can suck out the `bPruns`
// we can stop BatchProcessor by passing a nil pointer to the chan
// and start it again if needed, even from outside
// note: we normally never close the channel batChan but check for ok (closed)
select {
case <-timer.C:
timeout = true
//now = time.Now().UnixNano()
//diff := now - lastrun
//if diff > 0 {
// log.Printf("bP timeout lr=(%d mils)", (now - lastrun) / 1e6)
//}
//lastrun = now

case batptr, ok = <-db.batChan: // receives pointer to batch containing `calls []call`
if batptr == nil || !ok {
// channel received nil pointer or got closed
//log.Printf("bbolt BatchProcessor break")
break forever // so we can run anything after end forever
}
}
// left select: either timeout or received calls
if timeout {
// MaxBatchDelay timeout triggered
db.batchMu.Lock()
if db.batch == nil || len(db.batch.calls) == 0 {
// nothing todo unlocks
db.batchMu.Unlock()
timeout = false
timer.Reset(maxdelay)
continue forever
}
// we have waiting calls in db.batch we should flush
// get the pointer
batptr = db.batch
// reset db batch slice
db.batch = nil
// we can safely unlock here
db.batchMu.Unlock()
// runs batches because of timeout
// DEBUG POINT
//log.Printf("BP timeout Q=%d/%d | batches=%d/%d maxdelay=(%d mils)", len(db.batChan), batchcap, len(batptr.calls), maxbatch, maxdelay/1e6)

} else {
// received *batched as batptr from channel db.batChan
// DEBUG POINT
//log.Printf("BP batChan Q=%d/%d | batches=%d/%d", len(db.batChan), batchcap, len(batptr.calls), maxbatch)
} // end if else timeout

db.runBatch(batptr)

timeout, batptr = false, nil
if maxdelay > 0 {
timer.Reset(maxdelay)
}
continue forever
} // end forever
// process remaining batches
// one should stop sending batches before closing the db...
db.batchMu.Lock()
if db.batch != nil && len(db.batch.calls) > 0 {
db.runBatch(db.batch)
}
db.batchMu.Unlock()
<-db.bPruns // suck it out we're dead
//log.Printf("bbolt BatchProcessor quit")
} // end func BatchProcessor

type call struct {
fn func(*Tx) error
err chan<- error
}

// is used as batptr in BatchProcessor
type batch struct {
db *DB
timer *time.Timer
start sync.Once
calls []call
}

// trigger runs the batch if it hasn't already been run.
func (b *batch) trigger() {
b.start.Do(b.run)
}

// run performs the transactions in the batch and communicates results
// back to DB.Batch.
func (b *batch) run() {
b.db.batchMu.Lock()
b.timer.Stop()
// Make sure no new work is added to this batch, but don't break
// other batches.
if b.db.batch == b {
b.db.batch = nil
}
b.db.batchMu.Unlock()

func (db *DB) runBatch(b *batch) {
//log.Printf("runBatch recv=%d", len(bc))
retry:
for len(b.calls) > 0 {
var failIdx = -1
err := b.db.Update(func(tx *Tx) error {
err := db.Update(func(tx *Tx) error {
for i, c := range b.calls {
if err := safelyCall(c.fn, tx); err != nil {
failIdx = i
Expand All @@ -996,12 +1133,12 @@ retry:
})

if failIdx >= 0 {
// take the failing transaction out of the batch. it's
// safe to shorten b.calls here because db.batch no longer
// points to us, and we hold the mutex anyway.
// take the failing transaction out of the batch.
// it's safe to shorten b.calls here because *batch is ours now.
c := b.calls[failIdx]
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
// tell the submitter re-run it solo, continue with the rest of the batch
//log.Printf("ERROR runBatch db.Update err = trySolo")
c.err <- trySolo
continue retry
}
Expand All @@ -1010,6 +1147,8 @@ retry:
for _, c := range b.calls {
c.err <- err
}

b = nil
break retry
}
}
Expand Down Expand Up @@ -1268,6 +1407,11 @@ type Options struct {
// It prevents potential page faults, however
// used memory can't be reclaimed. (UNIX only)
Mlock bool

AllocSize int
MaxBatchDelay time.Duration
MaxBatchQueue int
MaxBatchSize int
}

// DefaultOptions represent the options used if nil options are passed into Open().
Expand All @@ -1276,6 +1420,10 @@ var DefaultOptions = &Options{
Timeout: 0,
NoGrowSync: false,
FreelistType: FreelistArrayType,
AllocSize: common.DefaultAllocSize,
MaxBatchDelay: common.DefaultMaxBatchDelay,
MaxBatchQueue: common.DefaultMaxBatchQueue,
MaxBatchSize: common.DefaultMaxBatchSize,
}

// Stats represents statistics about the database.
Expand Down
1 change: 1 addition & 0 deletions internal/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const IgnoreNoSync = runtime.GOOS == "openbsd"
const (
DefaultMaxBatchSize int = 1000
DefaultMaxBatchDelay = 10 * time.Millisecond
DefaultMaxBatchQueue = 1
DefaultAllocSize = 16 * 1024 * 1024
)

Expand Down