Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions src/moonlink/src/storage/mooncake_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ use arrow_schema::Schema;
use delete_vector::BatchDeletionVector;
pub(crate) use disk_slice::DiskSliceWriter;
use mem_slice::MemSlice;
use more_asserts as ma;
pub(crate) use snapshot::SnapshotTableState;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use table_snapshot::{IcebergSnapshotImportResult, IcebergSnapshotIndexMergeResult};
Expand Down Expand Up @@ -462,7 +463,8 @@ pub struct MooncakeTable {
wal_manager: WalManager,

/// LSN of ongoing flushes.
pub ongoing_flush_lsns: BTreeSet<u64>,
/// Maps from LSN to its count.
pub ongoing_flush_lsns: BTreeMap<u64, u32>,

/// Table replay sender.
event_replay_tx: Option<mpsc::UnboundedSender<MooncakeTableEvent>>,
Expand Down Expand Up @@ -565,7 +567,7 @@ impl MooncakeTable {
last_iceberg_snapshot_lsn,
table_notify: None,
wal_manager,
ongoing_flush_lsns: BTreeSet::new(),
ongoing_flush_lsns: BTreeMap::new(),
event_replay_tx: None,
})
}
Expand Down Expand Up @@ -840,15 +842,20 @@ impl MooncakeTable {
}

/// Flushes the disk slice for the transaction.
///
/// # Arguments
///
/// * ongoing_flush_count: used to increment ongoing flush count for the given LSN.
fn flush_disk_slice(
&mut self,
disk_slice: &mut DiskSliceWriter,
table_notify_tx: Sender<TableEvent>,
xact_id: Option<u32>,
ongoing_flush_count: u32,
event_id: uuid::Uuid,
) {
if let Some(lsn) = disk_slice.lsn() {
self.insert_ongoing_flush_lsn(lsn);
self.insert_ongoing_flush_lsn(lsn, ongoing_flush_count);
} else {
assert!(
xact_id.is_some(),
Expand Down Expand Up @@ -915,31 +922,41 @@ impl MooncakeTable {
fn try_set_next_flush_lsn(&mut self, lsn: u64) {
let min_pending_lsn = self.get_min_ongoing_flush_lsn();
if lsn < min_pending_lsn {
if let Some(old_flush_lsn) = self.next_snapshot_task.new_flush_lsn {
ma::assert_le!(old_flush_lsn, lsn);
}
self.next_snapshot_task.new_flush_lsn = Some(lsn);
}
}

// We fallback to u64::MAX if there are no pending flush LSNs so that the LSN is always greater than the flush LSN and the iceberg snapshot can proceed.
pub fn get_min_ongoing_flush_lsn(&self) -> u64 {
self.ongoing_flush_lsns
.iter()
.next()
.copied()
.unwrap_or(u64::MAX)
if let Some((lsn, _)) = self.ongoing_flush_lsns.first_key_value() {
return *lsn;
}
u64::MAX
}
Comment on lines 933 to 938

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function can be simplified to be more idiomatic by using map and unwrap_or on the Option returned by first_key_value.

Suggested change
pub fn get_min_ongoing_flush_lsn(&self) -> u64 {
self.ongoing_flush_lsns
.iter()
.next()
.copied()
.unwrap_or(u64::MAX)
if let Some((lsn, _)) = self.ongoing_flush_lsns.first_key_value() {
return *lsn;
}
u64::MAX
}
pub fn get_min_ongoing_flush_lsn(&self) -> u64 {
self.ongoing_flush_lsns
.first_key_value()
.map(|(lsn, _)| *lsn)
.unwrap_or(u64::MAX)
}


pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64) {
assert!(
self.ongoing_flush_lsns.insert(lsn),
"LSN {lsn} already in pending flush LSNs"
);
pub fn insert_ongoing_flush_lsn(&mut self, lsn: u64, count: u32) {
*self.ongoing_flush_lsns.entry(lsn).or_insert(0) += count;
}

pub fn remove_ongoing_flush_lsn(&mut self, lsn: u64) {
assert!(
self.ongoing_flush_lsns.remove(&lsn),
"LSN {lsn} not found in pending flush LSNs"
);
use std::collections::btree_map::Entry;

match self.ongoing_flush_lsns.entry(lsn) {
Entry::Occupied(mut entry) => {
let counter = entry.get_mut();
if *counter > 1 {
*counter -= 1;
} else {
entry.remove();
}
}
Entry::Vacant(_) => {
panic!("Tried to remove LSN {lsn}, but it is not tracked");
}
}
}

pub fn has_ongoing_flush(&self) -> bool {
Expand Down Expand Up @@ -1205,7 +1222,7 @@ impl MooncakeTable {
);

let table_notify_tx = self.table_notify.as_ref().unwrap().clone();
if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains(&lsn) {
if self.mem_slice.is_empty() || self.ongoing_flush_lsns.contains_key(&lsn) {
self.try_set_next_flush_lsn(lsn);
tokio::task::spawn(async move {
table_notify_tx
Expand Down Expand Up @@ -1238,6 +1255,7 @@ impl MooncakeTable {
&mut disk_slice,
table_notify_tx,
/*xact_id=*/ None,
/*ongoing_flush_count=*/ 1,
event_id,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub(crate) async fn flush_table_and_sync_no_apply(
}
}

/// Flush mooncake, block wait its completion.
/// Flush the given streaming transaction, block wait its completion.
#[cfg(test)]
pub(crate) async fn flush_stream_and_sync_no_apply(
table: &mut MooncakeTable,
Expand Down
Loading