Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ condition = { env_set = [ "ENABLE_BUILD_RUST" ] }
script = '''
#!/bin/bash
set -e
cd rust && cargo build --profile ${RISINGWAVE_BUILD_PROFILE} ${RISEDEV_CARGO_BUILD_FLAGS}
echo "**Reminder**: risedev will only build risingwave_cmd crate"
cd rust && cargo build -p risingwave_cmd --profile "${RISINGWAVE_BUILD_PROFILE}" ${RISEDEV_CARGO_BUILD_FLAGS}
'''

[tasks.build-frontend]
Expand Down
169 changes: 118 additions & 51 deletions rust/storage/src/memory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{btree_map, BTreeMap};
use std::cmp::Reverse;
use std::collections::BTreeMap;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, RangeBounds};
use std::sync::Arc;
Expand All @@ -11,10 +12,18 @@ use tokio::sync::Mutex;

use crate::{StateStore, StateStoreIter};

type KeyWithEpoch = (Bytes, Reverse<u64>);

/// An in-memory state store
///
/// The in-memory state store is a [`BTreeMap`], which maps (key, epoch) to value. It never does GC,
/// so the memory usage will be high. At the same time, everytime we create a new iterator on
/// `BTreeMap`, it will fully clone the map, so as to act as a snapshot. Therefore, in-memory state
/// store should never be used in production.
#[derive(Clone)]
pub struct MemoryStateStore {
inner: Arc<Mutex<BTreeMap<Bytes, Bytes>>>,
/// store (key, epoch) -> value
inner: Arc<Mutex<BTreeMap<KeyWithEpoch, Option<Bytes>>>>,
}

impl Default for MemoryStateStore {
Expand All @@ -23,19 +32,19 @@ impl Default for MemoryStateStore {
}
}

fn to_bytes_range<R, B>(range: R) -> (Bound<Bytes>, Bound<Bytes>)
fn to_bytes_range<R, B>(range: R) -> (Bound<KeyWithEpoch>, Bound<KeyWithEpoch>)
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]>,
{
let start = match range.start_bound() {
Included(k) => Included(Bytes::copy_from_slice(k.as_ref())),
Excluded(k) => Excluded(Bytes::copy_from_slice(k.as_ref())),
Included(k) => Included((Bytes::copy_from_slice(k.as_ref()), Reverse(u64::MAX))),
Excluded(k) => Excluded((Bytes::copy_from_slice(k.as_ref()), Reverse(0))),
Unbounded => Unbounded,
};
let end = match range.end_bound() {
Included(k) => Included(Bytes::copy_from_slice(k.as_ref())),
Excluded(k) => Excluded(Bytes::copy_from_slice(k.as_ref())),
Included(k) => Included((Bytes::copy_from_slice(k.as_ref()), Reverse(0))),
Excluded(k) => Excluded((Bytes::copy_from_slice(k.as_ref()), Reverse(u64::MAX))),
Unbounded => Unbounded,
};
(start, end)
Expand All @@ -55,14 +64,14 @@ impl MemoryStateStore {
STORE.clone()
}

async fn ingest_batch_inner(&self, kv_pairs: Vec<(Bytes, Option<Bytes>)>) -> Result<()> {
async fn ingest_batch_inner(
&self,
kv_pairs: Vec<(Bytes, Option<Bytes>)>,
epoch: u64,
) -> Result<()> {
let mut inner = self.inner.lock().await;
for (key, value) in kv_pairs {
if let Some(value) = value {
inner.insert(key, value);
} else {
inner.remove(&key);
}
inner.insert((key, Reverse(epoch)), value);
}
Ok(())
}
Expand All @@ -71,6 +80,7 @@ impl MemoryStateStore {
&self,
key_range: R,
limit: Option<usize>,
epoch: u64,
) -> Result<Vec<(Bytes, Bytes)>>
where
R: RangeBounds<B> + Send,
Expand All @@ -81,8 +91,18 @@ impl MemoryStateStore {
return Ok(vec![]);
}
let inner = self.inner.lock().await;
for (key, value) in inner.range(to_bytes_range(key_range)) {
data.push((key.clone(), value.clone()));

let mut last_key = None;
for ((key, Reverse(key_epoch)), value) in inner.range(to_bytes_range(key_range)) {
if *key_epoch > epoch {
continue;
}
if Some(key) != last_key.as_ref() {
if let Some(value) = value {
data.push((key.clone(), value.clone()));
}
last_key = Some(key.clone());
}
if let Some(limit) = limit && data.len() >= limit {
break;
}
Expand All @@ -92,49 +112,41 @@ impl MemoryStateStore {

async fn reverse_scan_inner<R, B>(
&self,
key_range: R,
limit: Option<usize>,
_key_range: R,
_limit: Option<usize>,
) -> Result<Vec<(Bytes, Bytes)>>
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]>,
{
let mut data = vec![];
if limit == Some(0) {
return Ok(vec![]);
}
let inner = self.inner.lock().await;
for (key, value) in inner.range(to_bytes_range(key_range)).rev() {
data.push((key.clone(), value.clone()));
if let Some(limit) = limit && data.len() >= limit {
break;
}
}
Ok(data)
todo!()
}
}

#[async_trait]
impl StateStore for MemoryStateStore {
type Iter<'a> = MemoryStateStoreIter;

async fn get(&self, key: &[u8], _epoch: u64) -> Result<Option<Bytes>> {
let inner = self.inner.lock().await;
let res = inner.get(key).cloned();
Ok(res)
async fn get(&self, key: &[u8], epoch: u64) -> Result<Option<Bytes>> {
let res = self.scan_inner(key..=key, Some(1), epoch).await?;
Ok(match res.as_slice() {
[] => None,
[(_, value)] => Some(value.clone()),
_ => unreachable!(),
})
}

async fn scan<R, B>(
&self,
key_range: R,
limit: Option<usize>,
_epoch: u64,
epoch: u64,
) -> Result<Vec<(Bytes, Bytes)>>
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]>,
{
self.scan_inner(key_range, limit).await
self.scan_inner(key_range, limit, epoch).await
}

async fn reverse_scan<R, B>(
Expand All @@ -150,26 +162,21 @@ impl StateStore for MemoryStateStore {
self.reverse_scan_inner(key_range, limit).await
}

async fn ingest_batch(&self, kv_pairs: Vec<(Bytes, Option<Bytes>)>, _epoch: u64) -> Result<()> {
// TODO: actually use epoch and support rollback
self.ingest_batch_inner(kv_pairs).await
async fn ingest_batch(&self, kv_pairs: Vec<(Bytes, Option<Bytes>)>, epoch: u64) -> Result<()> {
self.ingest_batch_inner(kv_pairs, epoch).await
}

async fn iter<R, B>(&self, key_range: R, _epoch: u64) -> Result<Self::Iter<'_>>
async fn iter<R, B>(&self, key_range: R, epoch: u64) -> Result<Self::Iter<'_>>
where
R: RangeBounds<B> + Send,
B: AsRef<[u8]>,
{
#[allow(clippy::mutable_key_type)]
let snapshot: BTreeMap<_, _> = self
.inner
.lock()
.await
.range(to_bytes_range(key_range))
.map(|(k, v)| (k.to_owned(), v.to_owned()))
.collect();

Ok(MemoryStateStoreIter::new(snapshot.into_iter()))
Ok(MemoryStateStoreIter::new(
self.scan_inner(key_range, None, epoch)
.await
.unwrap()
.into_iter(),
))
}

async fn replicate_batch(
Expand Down Expand Up @@ -199,11 +206,11 @@ impl StateStore for MemoryStateStore {
}

pub struct MemoryStateStoreIter {
inner: btree_map::IntoIter<Bytes, Bytes>,
inner: std::vec::IntoIter<(Bytes, Bytes)>,
}

impl MemoryStateStoreIter {
fn new(iter: btree_map::IntoIter<Bytes, Bytes>) -> Self {
fn new(iter: std::vec::IntoIter<(Bytes, Bytes)>) -> Self {
Self { inner: iter }
}
}
Expand All @@ -216,3 +223,63 @@ impl StateStoreIter for MemoryStateStoreIter {
Ok(self.inner.next())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_snapshot_isolation() {
let state_store = MemoryStateStore::new();
state_store
.ingest_batch(
vec![
(b"a".to_vec().into(), Some(b"v1".to_vec().into())),
(b"b".to_vec().into(), Some(b"v1".to_vec().into())),
],
0,
)
.await
.unwrap();
state_store
.ingest_batch(
vec![
(b"a".to_vec().into(), Some(b"v2".to_vec().into())),
(b"b".to_vec().into(), None),
],
1,
)
.await
.unwrap();
assert_eq!(
state_store.scan("a"..="b", None, 0).await.unwrap(),
vec![
(b"a".to_vec().into(), b"v1".to_vec().into()),
(b"b".to_vec().into(), b"v1".to_vec().into())
]
);
assert_eq!(
state_store.scan("a"..="b", Some(1), 0).await.unwrap(),
vec![(b"a".to_vec().into(), b"v1".to_vec().into())]
);
assert_eq!(
state_store.scan("a"..="b", None, 1).await.unwrap(),
vec![(b"a".to_vec().into(), b"v2".to_vec().into())]
);
assert_eq!(
state_store.get(b"a", 0).await.unwrap(),
Some(b"v1".to_vec().into())
);
assert_eq!(
state_store.get(b"b", 0).await.unwrap(),
Some(b"v1".to_vec().into())
);
assert_eq!(state_store.get(b"c", 0).await.unwrap(), None);
assert_eq!(
state_store.get(b"a", 1).await.unwrap(),
Some(b"v2".to_vec().into())
);
assert_eq!(state_store.get(b"b", 1).await.unwrap(), None);
assert_eq!(state_store.get(b"c", 1).await.unwrap(), None);
}
}
5 changes: 4 additions & 1 deletion rust/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ impl StateStoreImpl {
StateStoreImpl::HummockStateStore(inner.monitored(stats))
}

"in_memory" | "in-memory" => StateStoreImpl::shared_in_memory_store(stats.clone()),
"in_memory" | "in-memory" => {
tracing::warn!("in-memory state backend should never be used in benchmarks and production environment.");
StateStoreImpl::shared_in_memory_store(stats.clone())
}

tikv if tikv.starts_with("tikv") => {
let inner =
Expand Down
1 change: 0 additions & 1 deletion rust/stream/src/executor/lookup/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ async fn test_lookup_this_epoch() {
}

#[tokio::test]
#[ignore]
async fn test_lookup_last_epoch() {
// TODO: memory state store doesn't support read epoch yet, so this test won't pass for now.
// Will fix later.
Expand Down