Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions rust/storage/src/bummock/index/forward.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::collections::BTreeSet;

use parking_lot::Mutex;
use risingwave_common::error::Result;

use crate::hummock::cf::HummockColumnFamily;
use crate::hummock::HummockStorage;

type IndexEntry = (Vec<u8>, Option<Vec<u8>>);

#[derive(Debug)]
pub struct HummockForwardIndexer {
/// The column family to store the inverted index.
cf: HummockColumnFamily,

/// Indexing is based on batches.
wbuffer: Mutex<BTreeSet<IndexEntry>>,
}

/// Use Hummock storage as forward indexing backend.
/// Usually it's used for primary key index.
/// A key would be encoded to `| pk | epoch |`.
/// A value would be encoded to a byte stream (e.g. jsonb encoded).
impl HummockForwardIndexer {
pub fn new(storage: HummockStorage, cf_name: Vec<u8>) -> Self {
let cf = HummockColumnFamily::new(storage, cf_name);

Self {
cf,
wbuffer: Mutex::new(BTreeSet::new()),
}
}

/// TODO(xiangyhu) multithreaded indexing should be supported.
pub fn index_entry(&self, k: &[u8], v: &[u8]) -> Result<()> {
self.wbuffer.lock().insert((k.to_vec(), Some(v.to_vec())));
Ok(())
}

/// Stamp an epoch and persist the forward index.
pub async fn commit(&self, epoch: u64) -> Result<()> {
self.cf
.put_batch(self.wbuffer.lock().to_owned(), epoch)
.await?;

Ok(())
}
}
84 changes: 84 additions & 0 deletions rust/storage/src/bummock/index/inverted.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::collections::BTreeSet;
use std::ptr;

use bytes::BufMut;
use parking_lot::Mutex;
use risingwave_common::error::Result;

use crate::bummock::PK_SIZE;
use crate::hummock::cf::HummockColumnFamily;
use crate::hummock::HummockStorage;

type IndexEntry = (Vec<u8>, Option<Vec<u8>>);

#[derive(Debug)]
pub struct HummockInvertedIndexer {
/// The column family to store the inverted index.
cf: HummockColumnFamily,

/// Indexing is based on batches.
wbuffer: Mutex<BTreeSet<IndexEntry>>,
}

/// Use Hummock storage as inverted indexing backend.
/// Only Hummock keys are used.
/// `| sk | pk | epoch |`
impl HummockInvertedIndexer {
pub fn new(storage: HummockStorage, cf_name: Vec<u8>) -> Self {
let cf = HummockColumnFamily::new(storage, cf_name);

Self {
cf,
wbuffer: Mutex::new(BTreeSet::new()),
}
}

/// Index an `sk` and `pk` as an inverted index entry.
/// TODO(xiangyhu) multithreaded indexing should be supported.
pub fn index_entry(&self, sk: &[u8], pk: u64) -> Result<()> {
self.wbuffer
.lock()
.insert((self.compose_key(sk.to_vec(), pk), None));
Ok(())
}

/// Get an inverted index entry.
pub async fn get_entry(&self, _k: &[u8]) -> Result<(Vec<u8>, Option<Vec<u8>>)> {
todo!()
}

/// Stamp an epoch and persist the inverted index.
pub async fn commit(&self, epoch: u64) -> Result<()> {
self.cf
.put_batch(self.wbuffer.lock().to_owned(), epoch)
.await?;

Ok(())
}

/// Compose secondary key and primary key (document id).
pub fn compose_key(&self, mut sk: Vec<u8>, pk: u64) -> Vec<u8> {
sk.reserve(PK_SIZE);
let buf = sk.chunk_mut();

unsafe {
ptr::copy_nonoverlapping(
&pk as *const _ as *const u8,
buf.as_mut_ptr() as *mut _,
PK_SIZE,
);
sk.advance_mut(PK_SIZE);
}

sk
}

/// Decompose into a secondary key and a composite key.
pub fn decompose_key(ck: &[u8]) -> (&[u8], &[u8]) {
let pos = ck
.len()
.checked_sub(PK_SIZE)
.expect("bad inverted index entry");
ck.split_at(pos)
}
}
5 changes: 5 additions & 0 deletions rust/storage/src/bummock/index/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub use forward::*;
pub use inverted::*;

mod forward;
mod inverted;
33 changes: 33 additions & 0 deletions rust/storage/src/bummock/iter/index_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use bytes::Bytes;
use risingwave_common::error::Result;

use crate::hummock::iterator::UserIterator;

/// `HummockIndexIterator` iterates on the keys of the Hummock storage as indexes are encoded
/// to the keys only. The keys read and written in this iterator are all composite keys of
/// secondary key and primary key.
pub struct HummockIndexIterator<'a>(UserIterator<'a>);

impl<'a> HummockIndexIterator<'a> {
async fn next(&mut self) -> Result<Option<Bytes>> {
let iter = &mut self.0;

if iter.is_valid() {
let k = Bytes::copy_from_slice(iter.key());
iter.next().await?;
Ok(Some(k))
} else {
Ok(None)
}
}

async fn rewind(&mut self) -> Result<()> {
self.0.rewind().await?;
Ok(())
}

async fn seek(&mut self, key: &[u8]) -> Result<()> {
self.0.seek(key).await?;
Ok(())
}
}
3 changes: 3 additions & 0 deletions rust/storage/src/bummock/iter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub use index_iterator::*;

mod index_iterator;
15 changes: 15 additions & 0 deletions rust/storage/src/bummock/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! The implementation for Bummock indexing storage.

#![allow(dead_code)]

use std::sync::atomic::AtomicU64;

pub use index::*;
pub use iter::*;

mod index;
mod iter;

pub type AtomicDocumentId = AtomicU64;
pub type DocumentId = u64;
pub const PK_SIZE: usize = std::mem::size_of::<DocumentId>();
36 changes: 36 additions & 0 deletions rust/storage/src/hummock/cf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::collections::BTreeSet;

use risingwave_common::error::Result;

use crate::hummock::HummockStorage;

/// `HummockColumnFamily` provides physical isolation of storage files.
#[derive(Clone, Debug)]
pub struct HummockColumnFamily {
/// Use HummockStorage to store data.
storage: HummockStorage,

/// Encoded representation for all segments.
cf_name: Vec<u8>,
}

impl HummockColumnFamily {
pub fn new(storage: HummockStorage, cf_name: Vec<u8>) -> Self {
Self { storage, cf_name }
}

pub fn name(&self) -> &[u8] {
&self.cf_name
}

/// Put a batch to a column family.
pub async fn put_batch(
&self,
batch: BTreeSet<(Vec<u8>, Option<Vec<u8>>)>,
epoch: u64,
) -> Result<()> {
let cf_batch = batch.into_iter().map(|(k, v)| (k, v.into()));
self.storage.write_batch(cf_batch, epoch).await?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion rust/storage/src/hummock/iterator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use reverse_user::*;
mod merge;
pub use merge::*;
mod merge_inner;
mod user;
pub mod user;
pub use user::*;

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions rust/storage/src/hummock/mock/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#[allow(dead_code)]
mod mock_hummock_meta_client;

#[allow(unused_imports)]
pub(crate) use mock_hummock_meta_client::*;

mod mock_hummock_meta_service;
Expand Down
10 changes: 9 additions & 1 deletion rust/storage/src/hummock/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Hummock is the state store of the streaming system.

use std::fmt;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::RangeBounds;
use std::sync::Arc;
Expand All @@ -8,11 +9,12 @@ use itertools::Itertools;

mod sstable;
pub use sstable::*;
pub mod cf;
mod cloud;
pub mod compactor;
mod error;
pub mod hummock_meta_client;
mod iterator;
pub(crate) mod iterator;
pub mod key;
pub mod key_range;
pub mod local_version_manager;
Expand Down Expand Up @@ -508,3 +510,9 @@ impl HummockStorage {
self.local_version_manager.wait_epoch(epoch).await;
}
}

impl fmt::Debug for HummockStorage {
fn fmt(&self, _f: &mut fmt::Formatter<'_>) -> fmt::Result {
todo!()
}
}
4 changes: 4 additions & 0 deletions rust/storage/src/hummock/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ impl HummockStateStore {
pub fn new(storage: HummockStorage) -> Self {
Self { storage }
}

pub fn storage(&self) -> HummockStorage {
self.storage.clone()
}
}

// Note(eric): How about removing HummockStateStore and just impl StateStore for HummockStorage?
Expand Down
1 change: 1 addition & 0 deletions rust/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use risingwave_common::types::DataType;

pub mod bummock;
pub mod hummock;
pub mod keyspace;
pub mod memory;
Expand Down
28 changes: 25 additions & 3 deletions rust/storage/src/table/simple_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use super::{ScannableTableRef, TableManager};
use crate::table::mview::MViewTable;
use crate::{dispatch_state_store, Keyspace, StateStoreImpl, TableColumnDesc};

/// A simple implementation of in memory table for local tests.
/// It will be replaced in near future when replaced by locally
/// on-disk files.
/// Manages all tables in the storage backend.
pub struct SimpleTableManager {
// TODO: should not use `std::sync::Mutex` in async context.
tables: Mutex<HashMap<TableId, ScannableTableRef>>,
Expand Down Expand Up @@ -54,6 +52,30 @@ impl TableManager for SimpleTableManager {
Ok(table)
}

// async fn create_table_on_collection(
// &self,
// table_id: &CollectionId,
// table_columns: Vec<TableColumnDesc>,
// ) -> Result<Option<ScannableTableRef>> {
// let mut tables = self.lock_tables();

// ensure!(
// !tables.contains_key(table_id),
// "Table id already exists: {:?}",
// table_id
// );

// if let StateStoreImpl::HummockStateStore(hummock_state_store) = &self.state_store {
// let storage = hummock_state_store.storage();
// let collection = Collection::new_relation(storage, table_id, table_columns);
// let table = Arc::new(collection);
// tables.insert(table_id.clone(), table.clone());
// Ok(Some(table))
// } else {
// Ok(None)
// }
// }

fn get_table(&self, table_id: &TableId) -> Result<ScannableTableRef> {
let tables = self.lock_tables();
tables
Expand Down