Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 58 additions & 39 deletions docs/developer/benchmark_tool/state_store.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
ss_bench is used to benchmark the performance of the state store. In this doc, we first show a usage example and then describe each provided parameter.
`ss_bench` is used to benchmark the performance of the state store. In this doc, we first show a usage example and then describe each provided parameter.

# Usage Example

```shell
~/code/risingwave/rust: cargo run --bin ss-bench --\
--benchmarks "writebatch,prefixscanrandom,getrandom"\
--kvs-per-batch 1000\
--iterations 100\
~/code/risingwave/rust: cargo run --bin ss-bench -- \
--benchmarks "writebatch,getseq,getrandom,prefixscanrandom" \
--batch-size 1000 \
--reads 500 \
--concurrency-num 4
```

# Parameters

## State Store

### Type (`--store`)
### Backend Types (`--store`)

- `In-memory`

- Format: `in-memory`(or `in_memory`)
- Default value
- Default

- `Hummock+MinIO`

Expand Down Expand Up @@ -49,74 +49,93 @@ ss_bench is used to benchmark the performance of the state store. In this doc, w
- `--table-size-mb`

- Size (MB) of an SSTable
- Default value: 256
- Default: 256

- `--block-size-kb`

- Size (KB) of a block in an SSTable
- Default value: 64
- Default: 64

- `--bloom-false-positive`

- Bloom Filter false positive rate
- Default value: 0.1
- Default: 0.1

- `--checksum-algo`

- Checksum algorithm

- Options:

- `crc32c`: default value
- `crc32c`: default
- `xxhash`

## Benchmarks
## Operations

### Number (`--iterations`)
### Concurrency Number (`--concurrency-num`)

- The times that each benchmark has been executed
- Default value: 10
- Concurrency number of each operation. Workloads of each concurrency are almost the same.
- Default: 1

### Concurrency Number (`--concurrency-num`)
### Operation Types (`--benchmarks`)

Comma-separated list of operations to run in the specified order. Following operations are supported:

- `writebatch`: write N key/values in sequential key order in async mode.
- `getrandom`: read N times in random order.
- `getseq`: read N times sequentially.
- `prefixscanrandom`: prefix scan N times in random order.

Example: `--benchmarks "writebatch,prefixscanrandom,getrandom"`

### Operation Numbers

- `--num`

- The concurrency number of each benchmark
- Default value: 1
- Number of key/values to place in database.
- Default: 1000000

### Benchmark Type (`--benchmarks`)
- `--deletes`

- `writebatch`: write `iterations` KV pairs in sequential key order in async mode
- `getrandom`: read `iterations` times in random order
- `getseq`: read `iterations` times in sequential order
- `prefixscanrandom`: prefix scan `iterations` times in random order
- Number of deleted keys. If negative, do `--num` deletions.
- Default: -1

The benchmarks could be a combination of multiple consequent benchmarks. Example: `--benchmarks "writebatch,prefixscanrandom,getrandom"`
- `--reads`

## Batch Configurations
- Number of read keys. If negative, do `--num` reads.
- Default: -1

- `--write_batches`

- Number of **written batches**.
- Default: 100

## Single Batch

- `--batch-size`

- Number of key/values in a batch.
- Default: 100

- `--key-size`

- The size (bytes) of the non-prefix part of a key
- Default value: 10
- Size (bytes) of each user_key (non-prefix part of a key).
- Default: 16

- `--key-prefix-size`

- The size (bytes) of a prefix
- Default value: 5
- Size (bytes) of each prefix.
- Default: 5

- `--key-prefix-frequency`
- `--keys_per_prefix`

- The number of keys with some a prefix in a batch
- Default value: 10
- Control **average** number of keys generated per prefix.
- Default: 10

- `--value-size`

- The length (bytes) of a value in a KV pair
- Default value: 10

- `--kvs-per-batch`

- The number of KV pairs in a batch
- Default value: 1000
- Size (bytes) of each value.
- Default: 100

# Metrics

Expand Down
93 changes: 63 additions & 30 deletions rust/bench/ss_bench/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::Arc;

mod benchmarks;
mod operations;
mod utils;

use benchmarks::*;
use clap::Parser;
use operations::*;
use risingwave_common::error::{Result, RwError};
use risingwave_pb::hummock::checksum::Algorithm as ChecksumAlg;
use risingwave_storage::hummock::local_version_manager::LocalVersionManager;
Expand Down Expand Up @@ -33,44 +33,54 @@ pub(crate) struct Opts {
#[clap(long, default_value = "in-memory")]
store: String,

// ----- Hummock -----
#[clap(long, default_value_t = 256)]
table_size_mb: u32,

#[clap(long, default_value_t = 64)]
block_size_kb: u32,

#[clap(long, default_value_t = 0.1)]
bloom_false_positive: f64,

#[clap(long, default_value = "crc32c")]
checksum_algo: String,

// ----- benchmarks -----
#[clap(long)]
benchmarks: String,

#[clap(long, default_value_t = 10)]
iterations: u32,

#[clap(long, default_value_t = 1)]
concurrency_num: u32,

// ----- Hummock -----
#[clap(long, default_value_t = 64)]
block_size_kb: u32,
// ----- operation number -----
#[clap(long, default_value_t = 1000000)]
num: i64,

#[clap(long, default_value_t = 256)]
table_size_mb: u32,
#[clap(long, default_value_t = -1)]
deletes: i64,

#[clap(long, default_value_t = 0.1)]
bloom_false_positive: f64,
#[clap(long, default_value_t = -1)]
reads: i64,

#[clap(long, default_value = "crc32c")]
checksum_algo: String,
#[clap(long, default_value_t = 100)]
write_batches: u64,

// ----- data -----
#[clap(long, default_value_t = 10)]
// ----- single batch -----
#[clap(long, default_value_t = 100)]
batch_size: u32,

#[clap(long, default_value_t = 16)]
key_size: u32,

#[clap(long, default_value_t = 5)]
key_prefix_size: u32,

#[clap(long, default_value_t = 10)]
key_prefix_frequency: u32,
keys_per_prefix: u32,

#[clap(long, default_value_t = 10)]
#[clap(long, default_value_t = 100)]
value_size: u32,

#[clap(long, default_value_t = 1000)]
kvs_per_batch: u32,
}

fn get_checksum_algo(algo: &str) -> ChecksumAlg {
Expand Down Expand Up @@ -156,9 +166,9 @@ async fn get_state_store_impl(opts: &Opts) -> Result<StateStoreImpl> {
Ok(instance)
}

async fn run_benchmarks(store: impl StateStore, opts: &Opts) {
for benchmark in opts.benchmarks.split(',') {
match benchmark {
async fn run_operations(store: impl StateStore, opts: &Opts) {
for operation in opts.benchmarks.split(',') {
match operation {
"writebatch" => write_batch::run(&store, opts).await,
"getrandom" => get_random::run(&store, opts).await,
"getseq" => get_seq::run(&store, opts).await,
Expand All @@ -168,13 +178,36 @@ async fn run_benchmarks(store: impl StateStore, opts: &Opts) {
}
}

fn preprocess_options(opts: &mut Opts) {
if opts.reads < 0 {
opts.reads = opts.num;
}
if opts.deletes < 0 {
opts.deletes = opts.num;
}

// check illegal configurations
for operation in opts.benchmarks.split(',') {
if operation == "getseq" {
// TODO(sun ting): eliminate this limitation
if opts.batch_size < opts.reads as u32 {
panic!(
"In sequential mode, `batch_size` should be greater than or equal to `reads`"
);
}
}
}
}

/// This is used to benchmark the state store performance.
/// For usage, see: https://github.com/singularity-data/risingwave-dev/blob/main/docs/developer/benchmark_tool/state_store.md
#[tokio::main(flavor = "multi_thread")]
async fn main() {
let opts = Opts::parse();
let mut opts = Opts::parse();

println!("Input configurations: {:?}", &opts);
println!("Configurations before preprocess:\n {:?}", &opts);
preprocess_options(&mut opts);
println!("Configurations after preprocess:\n {:?}", &opts);

let state_store = match get_state_store_impl(&opts).await {
Ok(state_store_impl) => state_store_impl,
Expand All @@ -185,9 +218,9 @@ async fn main() {
};

match state_store {
StateStoreImpl::Hummock(store) => run_benchmarks(store, &opts).await,
StateStoreImpl::Memory(store) => run_benchmarks(store, &opts).await,
StateStoreImpl::RocksDB(store) => run_benchmarks(store, &opts).await,
StateStoreImpl::Tikv(store) => run_benchmarks(store, &opts).await,
StateStoreImpl::Hummock(store) => run_operations(store, &opts).await,
StateStoreImpl::Memory(store) => run_operations(store, &opts).await,
StateStoreImpl::RocksDB(store) => run_operations(store, &opts).await,
StateStoreImpl::Tikv(store) => run_operations(store, &opts).await,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ pub(crate) async fn run(store: &impl StateStore, opts: &Opts) {

// generate queried point get key
let mut rng = StdRng::seed_from_u64(233);
let range = Uniform::from(0..opts.kvs_per_batch as usize);
let key_num = (opts.iterations - opts.iterations % opts.concurrency_num) as usize;
let mut get_keys = (0..key_num)
let dist = Uniform::from(0..opts.batch_size as usize);
let mut get_keys = (0..opts.reads)
.into_iter()
.map(|_| batch[range.sample(&mut rng)].0.clone())
.map(|_| batch[dist.sample(&mut rng)].0.clone())
.collect_vec();
let get_keys_len = get_keys.len();

// partitioned these keys for each concurrency
let mut grouped_keys = vec![vec![]; opts.concurrency_num as usize];
Expand Down Expand Up @@ -58,7 +58,7 @@ pub(crate) async fn run(store: &impl StateStore, opts: &Opts) {
}
}
let stat = LatencyStat::new(latencies);
let qps = key_num as u128 * 1_000_000_000 / total_time_nano as u128;
let qps = get_keys_len as u128 * 1_000_000_000 / total_time_nano as u128;

println!(
"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ pub(crate) async fn run(store: &impl StateStore, opts: &Opts) {
.unwrap();

// generate queried point get key
let key_num = (opts.iterations - opts.iterations % opts.concurrency_num) as usize;
let mut get_keys = (0..key_num)
let mut get_keys = (0..opts.reads as usize)
.into_iter()
.map(|i| batch[i].0.clone())
.collect_vec();
let get_keys_len = get_keys.len();

// partitioned these keys for each concurrency
let mut grouped_keys = vec![vec![]; opts.concurrency_num as usize];
Expand Down Expand Up @@ -53,7 +53,7 @@ pub(crate) async fn run(store: &impl StateStore, opts: &Opts) {
}
}
let stat = LatencyStat::new(latencies);
let qps = key_num as u128 * 1_000_000_000 / total_time_nano as u128;
let qps = get_keys_len as u128 * 1_000_000_000 / total_time_nano as u128;

println!(
"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ pub(crate) async fn run(store: &impl StateStore, opts: &Opts) {

// generate queried prefixes
let mut rng = StdRng::seed_from_u64(233);
let range = Uniform::from(0..workload.prefixes.len());
let prefix_num = (opts.iterations - opts.iterations % opts.concurrency_num) as usize;
let mut scan_prefixes = (0..prefix_num)
let dist = Uniform::from(0..workload.prefixes.len());
let mut scan_prefixes = (0..opts.reads)
.into_iter()
.map(|_| workload.prefixes[range.sample(&mut rng)].clone())
.map(|_| workload.prefixes[dist.sample(&mut rng)].clone())
.collect_vec();
let scan_prefixes_len = scan_prefixes.len();

// partitioned these prefixes for each concurrency
let mut grouped_prefixes = vec![vec![]; opts.concurrency_num as usize];
Expand Down Expand Up @@ -61,7 +61,7 @@ pub(crate) async fn run(store: &impl StateStore, opts: &Opts) {
}
}
let stat = LatencyStat::new(latencies);
let qps = prefix_num as u128 * 1_000_000_000 / total_time_nano as u128;
let qps = scan_prefixes_len as u128 * 1_000_000_000 / total_time_nano as u128;

println!(
"
Expand Down
Loading