Stop writing plumbing. Start shipping features.
Protoflow is a productivity layer for Watermill that simplifies event-driven architecture. It manages routers, publishers, subscribers, and middleware so you can focus on your domain logic.
Whether you are using Protobufs or JSON, Protoflow provides a type-safe, production-ready foundation for Kafka, RabbitMQ, AWS SNS/SQS, NATS, HTTP, and Go Channels.
- Type-Safe Handlers: Generic
RegisterProtoHandlerandRegisterJSONHandlerhelpers keep your code clean. - 7 Built-in Transports: Kafka, RabbitMQ, AWS SNS/SQS, NATS, HTTP, File I/O, and Go Channels - switch with a single config change.
- Batteries Included: Default middleware stack with correlation IDs, structured logging, protobuf validation, outbox pattern, OpenTelemetry tracing, Prometheus metrics, retries with exponential backoff, poison queues, and panic recovery.
- Pluggable Logging: Bring your own logger (slog, logrus, zerolog) via
ServiceLoggerabstraction. - Safe Configuration: Built-in validation with credential redaction in logs.
- Graceful Lifecycle: Clean shutdown of HTTP servers and message routers.
- Extensible: Custom transport factories, middleware, validators, and outbox stores.
- Install:
go get github.com/drblury/protoflow(Go 1.23+). - Configure: Set up
protoflow.Config. - Launch: Create a
Service, register your handlers, andStart.
```go // 1. Configure your transport (Kafka, RabbitMQ, AWS, NATS, HTTP, IO, Channel) cfg := &protoflow.Config{ PubSubSystem: "channel", // Use in-memory channel for testing PoisonQueue: "orders.poison", }
// 2. Use your preferred logger (slog, logrus, zerolog, etc.) logger := protoflow.NewSlogServiceLogger(slog.Default()) svc := protoflow.NewService(cfg, logger, ctx, protoflow.ServiceDependencies{})
// 3. Register a strongly-typed handler must(protoflow.RegisterProtoHandler(svc, protoflow.ProtoHandlerRegistration[*models.OrderCreated]{ Name: "orders-created", ConsumeQueue: "orders.created", Handler: func(ctx context.Context, evt protoflow.ProtoMessageContext[*models.OrderCreated]) ([]protoflow.ProtoMessageOutput, error) { evt.Logger.Info("Order received", protoflow.LogFields{"id": evt.Payload.OrderId}) return nil, nil }, }))
// 4. Start the service go func() { _ = svc.Start(ctx) }() ```
Want to emit events? Need metadata handling? Check out the Handlers Guide.
| Transport | Config Value | Use Case |
|---|---|---|
| Go Channels | "channel" |
Testing, local development |
| Kafka | "kafka" |
High-throughput streaming |
| RabbitMQ | "rabbitmq" |
Traditional message queuing |
| AWS SNS/SQS | "aws" |
Cloud-native pub/sub |
| NATS | "nats" |
High-performance messaging |
| HTTP | "http" |
Request/response patterns |
| File I/O | "io" |
Simple message persistence |
- Correlation ID: Injects tracing IDs into message metadata
- Message Logging: Debug logging with payload and metadata
- Proto Validation: Schema validation for protobuf messages
- Outbox Pattern: Reliable message delivery via OutboxStore
- OpenTelemetry Tracing: Distributed tracing with span propagation
- Prometheus Metrics: Request counts and latencies
- Retry with Backoff: Configurable exponential backoff
- Poison Queue: Dead letter queue for unprocessable messages
- Panic Recovery: Converts panics to errors for graceful handling
protoflow.ServiceLogger unifies logging across the router, middleware, and transports. Wrap your favorite logger and pass it to NewService:
protoflow.NewSlogServiceLogger: Adaptslog/slog(standard library).protoflow.NewEntryServiceLogger: Adapts structured loggers (logrus, zerolog) viaEntryLoggerAdapter[T].
```go svc := protoflow.NewService(cfg, protoflow.NewEntryServiceLogger(myLogrusEntry), ctx, protoflow.ServiceDependencies{}, ) ```
Protoflow provides TryNewService for error-returning service creation:
```go svc, err := protoflow.TryNewService(cfg, logger, ctx, deps) if err != nil { // Handle protoflow.ConfigValidationError, etc. } ```
- Handlers Guide: Typed handlers, metadata, and publishing.
- Configuration Guide: Transports, middleware, and dependency injection.
- Development Guide: Local development, testing, and Taskfile workflow.
Check out examples/ for runnable code:
simple: Basic Protoflow usage with Go Channels.json: Typed JSON handlers with metadata enrichment.proto: Protobuf handlers with generated models.full: Comprehensive example with custom middleware, validators, and outbox.
Run them with: go run ./examples/<name>
We use task (Taskfile) to manage development:
task lint: Run golangci-linttask test: Run full test suite
See the Development Guide for details.
- Fork the repo and branch from
main. - Run
task lintandtask testbefore opening a PR. - Add or update docs in
docs/for new features. - Keep commits focused with context in PR descriptions.
Protoflow is available under the MIT License.