A robust blockchain block queue implementation with built-in reorganization protection for Ethereum-compatible chains. ReorgQueue provides a reliable way to process blockchain data while handling chain reorganizations gracefully.
- Reorg Protection: Maintains a configurable buffer of recent blocks to handle chain reorganizations
- Persistent Storage: SQLite-based storage for reliability across restarts
- Memory Cache: In-memory cache for recent blocks to improve performance
- Batch Operations: Support for batch block insertions for efficiency
- Backoff Mechanism: Configurable exponential backoff when queue is full
- Metrics: Prometheus metrics for monitoring queue operations
- Trace Support: Stores multiple types of blockchain traces including:
- Call traces
- Prestate traces
- Keccak256 preimage traces
- State access traces
go get github.com/draganm/reorgqueuepackage main
import (
"context"
"database/sql"
"log"
"github.com/draganm/reorgqueue"
"github.com/ethereum/go-ethereum/common"
_ "github.com/mattn/go-sqlite3"
)
func main() {
// Open database connection
db, err := sql.Open("sqlite3", "blocks.db")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Create reorg queue with default configuration
writer, consumer, err := reorgqueue.NewReorgQueueWithDefaults(db)
if err != nil {
log.Fatal(err)
}
// Add a block
ctx := context.Background()
block := reorgqueue.Block{
Number: 1000,
Hash: common.HexToHash("0x123..."),
Parent: common.HexToHash("0x122..."),
Block: []byte("block data"),
}
err = writer.AddBlock(ctx, block)
if err != nil {
log.Printf("Failed to add block: %v", err)
}
// Consume blocks
blocks, err := consumer.GetBlocks(ctx, 10)
if err != nil {
log.Printf("Failed to get blocks: %v", err)
}
// Process blocks...
// Delete processed blocks
if len(blocks) > 0 {
err = consumer.DeleteBlocks(ctx, blocks[0].Number, blocks[len(blocks)-1].Number)
if err != nil {
log.Printf("Failed to delete blocks: %v", err)
}
}
}config := reorgqueue.Config{
MaxBlocks: 5000, // Maximum blocks to store
ReorgBuffer: 128, // Blocks to keep for reorg protection
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 30 * time.Second,
BackoffMultiplier: 2.0,
}
writer, consumer, err := reorgqueue.NewReorgQueue(db, config)AddBlock(ctx context.Context, block Block) error- Add a single block to the queueAddBlocksBatch(ctx context.Context, blocks []Block) error- Add multiple blocks in a batch
GetBlocks(ctx context.Context, maxBlocks uint) ([]Block, error)- Retrieve blocks from the queueDeleteBlocks(ctx context.Context, startBlock, endBlock uint64) error- Delete processed blocksGetHighestBlock(ctx context.Context) (uint64, error)- Get the highest block number in queueGetBlockCount(ctx context.Context) (uint, error)- Get total number of blocks in queueGetTailBlock() (number uint64, hash common.Hash, exists bool)- Get the tail block information
type Block struct {
Number uint64
Hash common.Hash
Parent common.Hash
Block []byte
Receipts []byte
CallTraces []byte
PrestateTraces []byte
Keccak256PreimageTraces []byte
StateAccessTraces []byte
}- Block Addition: When blocks are added, they are validated against the parent chain
- Reorg Detection: If a block with a different parent is added at an existing height, older blocks are removed
- Buffer Protection: The most recent N blocks (ReorgBuffer) are kept and not available for consumption
- Consumption: Only blocks older than the reorg buffer are available for processing
- Tail Tracking: The system maintains a "tail" pointer to track processed blocks
| Option | Default | Description |
|---|---|---|
MaxBlocks |
2000 | Maximum number of blocks to store in the queue |
ReorgBuffer |
64 | Number of recent blocks to keep for reorganization protection |
InitialBackoff |
100ms | Initial backoff duration when queue is full |
MaxBackoff |
5s | Maximum backoff duration |
BackoffMultiplier |
1.5 | Multiplier for exponential backoff |
The following Prometheus metrics are exposed:
reorgqueue_blocks_inserted_total- Total number of blocks inserted (with status label)reorgqueue_blocks_deleted_total- Total number of blocks deletedreorgqueue_reorgs_detected_total- Total number of reorganizations detectedreorgqueue_cache_hits_total- Cache hit countreorgqueue_cache_misses_total- Cache miss countreorgqueue_cache_size- Current cache sizereorgqueue_queue_size- Current queue sizereorgqueue_operation_duration_seconds- Operation duration histogram
- Go 1.21 or higher
- SQLite3
- sqlc (for regenerating SQL code)
go test ./...If you modify the SQL schema or queries:
cd sqlitestore
sqlc generateThis project is licensed under the GNU Affero General Public License v3.0 (AGPL-3.0). See the LICENSE file for details.
Contributions are welcome! Please feel free to submit a Pull Request.
For issues and questions, please use the GitHub issue tracker.