Skip to content

ETL

Stream Postgres changes anywhere, in real-time

ETL is a Rust framework for building change data capture (CDC) pipelines on Postgres. Stream inserts, updates, and deletes to BigQuery, Apache Iceberg, or your own custom destinations.

Start Here

Your background Recommended path
New to Postgres logical replication Postgres Replication Concepts
Ready to build Your First Pipeline (15 min)
Need custom destinations Custom Stores and Destinations (30 min)
Setting up Postgres Configure Postgres

Why ETL?

  • Real-time: Changes stream as they happen, not in batches
  • Reliable: At-least-once delivery with automatic retries
  • Extensible: Implement one trait to add any destination
  • Fast: Parallel initial copy, configurable batching
  • Type-safe: Rust API with compile-time guarantees

How It Works

  1. Initial copy: ETL copies existing table data to your destination
  2. Streaming: ETL streams events (Insert, Update, Delete, and more) in real-time
  3. Recovery: The store persists state so pipelines resume after restarts

See Architecture for details.

Quick Example

Add ETL to your project:

1
2
3
[dependencies]
etl = { git = "https://github.com/supabase/etl" }
tokio = { version = "1.0", features = ["full"] }

Create a pipeline:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
use etl::{
    config::{BatchConfig, PgConnectionConfig, PipelineConfig, TlsConfig},
    destination::memory::MemoryDestination,
    pipeline::Pipeline,
    store::both::memory::MemoryStore,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let pg_config = PgConnectionConfig {
        host: "localhost".to_string(),
        port: 5432,
        name: "mydb".to_string(),
        username: "postgres".to_string(),
        password: Some("password".to_string().into()),
        tls: TlsConfig { enabled: false, trusted_root_certs: String::new() },
        keepalive: None,
    };

    let config = PipelineConfig {
        id: 1,
        publication_name: "my_publication".to_string(),
        pg_connection: pg_config,
        batch: BatchConfig { max_size: 1000, max_fill_ms: 5000 },
        table_error_retry_delay_ms: 10_000,
        table_error_retry_max_attempts: 5,
        max_table_sync_workers: 4,
    };

    let store = MemoryStore::new();
    let destination = MemoryDestination::new();

    let mut pipeline = Pipeline::new(config, store, destination);
    pipeline.start().await?;
    pipeline.wait().await?;

    Ok(())
}

Documentation

Section What you'll find
Guides Step-by-step instructions to get things done
Explanations Deep dives into concepts and architecture

Contributing

Pull requests and issues welcome on GitHub.

New destinations: Open an issue first to gauge interest. Each built-in destination carries long-term maintenance cost, so we only accept those with significant community demand.