Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions rust/stream/src/executor/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct TopNExecutor<S: StateStore> {
/// The input of the current executor
input: Box<dyn Executor>,
/// The ordering
order_types: Vec<OrderType>,
pk_order_types: Vec<OrderType>,
/// `LIMIT XXX`. `None` means no limit.
limit: Option<usize>,
/// `OFFSET XXX`. `0` means no offset.
Expand All @@ -47,7 +47,7 @@ impl<S: StateStore> std::fmt::Debug for TopNExecutor<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TopNExecutor")
.field("input", &self.input)
.field("order_types", &self.order_types)
.field("pk_order_types", &self.pk_order_types)
.field("limit", &self.limit)
.field("offset", &self.offset)
.field("pk_indices", &self.pk_indices)
Expand All @@ -59,56 +59,62 @@ impl<S: StateStore> TopNExecutor<S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
input: Box<dyn Executor>,
order_types: Vec<OrderType>,
pk_order_types: Vec<OrderType>,
offset_and_limit: (usize, Option<usize>),
pk_indices: PkIndices,
keyspace: Keyspace<S>,
cache_size: Option<usize>,
total_count: (usize, usize, usize),
executor_id: u64,
) -> Self {
let data_types = pk_indices
let pk_data_types = pk_indices
.iter()
.map(|idx| input.schema().fields[*idx].data_type())
.collect::<Vec<_>>();
let row_data_types = input
.schema()
.fields
.iter()
.map(|field| field.data_type)
.collect::<Vec<_>>();
let ordered_row_deserializer =
OrderedRowDeserializer::new(data_types.clone(), order_types.clone());
OrderedRowDeserializer::new(pk_data_types, pk_order_types.clone());
let lower_sub_keyspace = keyspace.with_segment(Segment::FixedLength(b"l".to_vec()));
let middle_sub_keyspace = keyspace.with_segment(Segment::FixedLength(b"m".to_vec()));
let higher_sub_keyspace = keyspace.with_segment(Segment::FixedLength(b"h".to_vec()));
let managed_lowest_state = ManagedTopNState::<S, TOP_N_MAX>::new(
cache_size,
total_count.0,
lower_sub_keyspace,
data_types.clone(),
row_data_types.clone(),
ordered_row_deserializer.clone(),
0,
);
let managed_middle_state = ManagedTopNBottomNState::new(
cache_size,
total_count.1,
middle_sub_keyspace,
data_types.clone(),
row_data_types.clone(),
ordered_row_deserializer.clone(),
);
let managed_highest_state = ManagedTopNState::<S, TOP_N_MIN>::new(
cache_size,
total_count.2,
higher_sub_keyspace,
data_types,
row_data_types,
ordered_row_deserializer,
0,
);
Self {
input,
order_types,
pk_order_types,
offset: offset_and_limit.0,
limit: offset_and_limit.1,
managed_lowest_state,
managed_middle_state,
managed_highest_state,
pk_indices,
first_execution: false,
first_execution: true,
identity: format!("TopNExecutor {:X}", executor_id),
}
}
Expand Down Expand Up @@ -165,7 +171,7 @@ impl<S: StateStore> TopNExecutorBase for TopNExecutor<S> {
.iter()
.map(|idx| row_ref.0[*idx].to_owned_datum())
.collect::<Vec<_>>());
let ordered_pk_row = OrderedRow::new(pk_row, &self.order_types);
let ordered_pk_row = OrderedRow::new(pk_row, &self.pk_order_types);
let row = row_ref.into();
match *op {
Op::Insert | Op::UpdateInsert => {
Expand Down
24 changes: 15 additions & 9 deletions rust/stream/src/executor/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct AppendOnlyTopNExecutor<S: StateStore> {
/// The input of the current executor
input: Box<dyn Executor>,
/// The ordering
order_types: Vec<OrderType>,
pk_order_types: Vec<OrderType>,
/// `LIMIT XXX`. `None` means no limit.
limit: Option<usize>,
/// `OFFSET XXX`. `0` means no offset.
Expand All @@ -80,7 +80,7 @@ impl<S: StateStore> std::fmt::Debug for AppendOnlyTopNExecutor<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AppendOnlyTopNExecutor")
.field("input", &self.input)
.field("order_types", &self.order_types)
.field("pk_order_types", &self.pk_order_types)
.field("limit", &self.limit)
.field("offset", &self.offset)
.field("pk_indices", &self.pk_indices)
Expand All @@ -92,40 +92,46 @@ impl<S: StateStore> AppendOnlyTopNExecutor<S> {
#[allow(clippy::too_many_arguments)]
pub fn new(
input: Box<dyn Executor>,
order_types: Vec<OrderType>,
pk_order_types: Vec<OrderType>,
offset_and_limit: (usize, Option<usize>),
pk_indices: PkIndices,
keyspace: Keyspace<S>,
cache_size: Option<usize>,
total_count: (usize, usize),
executor_id: u64,
) -> Self {
let data_types = pk_indices
let pk_data_types = pk_indices
.iter()
.map(|idx| input.schema().fields[*idx].data_type())
.collect::<Vec<_>>();
let row_data_types = input
.schema()
.fields
.iter()
.map(|field| field.data_type)
.collect::<Vec<_>>();
let lower_sub_keyspace = keyspace.with_segment(Segment::FixedLength(b"l/".to_vec()));
let higher_sub_keyspace = keyspace.with_segment(Segment::FixedLength(b"h/".to_vec()));
let ordered_row_deserializer =
OrderedRowDeserializer::new(data_types.clone(), order_types.clone());
OrderedRowDeserializer::new(pk_data_types, pk_order_types.clone());
Self {
input,
order_types,
pk_order_types,
offset: offset_and_limit.0,
limit: offset_and_limit.1,
managed_lower_state: ManagedTopNState::<S, TOP_N_MAX>::new(
cache_size,
total_count.0,
lower_sub_keyspace,
data_types.clone(),
row_data_types.clone(),
ordered_row_deserializer.clone(),
0,
),
managed_higher_state: ManagedTopNState::<S, TOP_N_MAX>::new(
cache_size,
total_count.1,
higher_sub_keyspace,
data_types,
row_data_types,
ordered_row_deserializer,
0,
),
Expand Down Expand Up @@ -192,7 +198,7 @@ impl<S: StateStore> TopNExecutorBase for AppendOnlyTopNExecutor<S> {
.iter()
.map(|idx| row_ref.0[*idx].to_owned_datum())
.collect::<Vec<_>>());
let ordered_pk_row = OrderedRow::new(pk_row, &self.order_types);
let ordered_pk_row = OrderedRow::new(pk_row, &self.pk_order_types);
let row = row_ref.into();
if self.managed_lower_state.total_count() < self.offset {
// `elem` is in the range of `[0, offset)`,
Expand Down