Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,19 @@ impl DataChunk {
&self.vis2
}

pub fn selectivity(&self) -> f64 {
match &self.vis2 {
Vis::Bitmap(b) => {
if b.is_empty() {
0.0
} else {
b.count_ones() as f64 / b.len() as f64
}
}
Vis::Compact(_) => 1.0,
}
}

pub fn with_visibility(&self, visibility: Bitmap) -> Self {
DataChunk::new(self.columns.clone(), visibility)
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl StreamChunk {
self.data.capacity()
}

pub fn selectivity(&self) -> f64 {
self.data.selectivity()
}

/// Get the reference of the underlying data chunk.
pub fn data_chunk(&self) -> &DataChunk {
&self.data
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ async fn test_merger_sum_aggr() {
],
3,
MultiMap::new(),
0.0,
);

let items = Arc::new(Mutex::new(vec![]));
Expand Down
21 changes: 16 additions & 5 deletions src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ struct Inner {
/// All the watermark derivations, (input_column_index, output_column_index). And the
/// derivation expression is the project's expression itself.
watermark_derivations: MultiMap<usize, usize>,

/// the selectivity threshold which should be in [0,1]. for the chunk with selectivity less
/// than the threshold, the Project executor will construct a new chunk before expr evaluation,
materialize_selectivity_threshold: f64,
}

impl ProjectExecutor {
Expand All @@ -50,6 +54,7 @@ impl ProjectExecutor {
exprs: Vec<BoxedExpression>,
executor_id: u64,
watermark_derivations: MultiMap<usize, usize>,
materialize_selectivity_threshold: f64,
) -> Self {
let info = ExecutorInfo {
schema: input.schema().to_owned(),
Expand All @@ -74,6 +79,7 @@ impl ProjectExecutor {
},
exprs,
watermark_derivations,
materialize_selectivity_threshold,
},
}
}
Expand Down Expand Up @@ -110,10 +116,12 @@ impl Inner {
&self,
chunk: StreamChunk,
) -> StreamExecutorResult<Option<StreamChunk>> {
let chunk = chunk.compact();

let chunk = if chunk.selectivity() <= self.materialize_selectivity_threshold {
chunk.compact()
} else {
chunk
};
let (data_chunk, ops) = chunk.into_parts();

let mut projected_columns = Vec::new();

for expr in &self.exprs {
Expand All @@ -125,8 +133,9 @@ impl Inner {
let new_column = Column::new(evaluated_expr);
projected_columns.push(new_column);
}

let new_chunk = StreamChunk::new(ops, projected_columns, None);
let (_, vis) = data_chunk.into_parts();
let vis = vis.into_visibility();
let new_chunk = StreamChunk::new(ops, projected_columns, vis);
Ok(Some(new_chunk))
}

Expand Down Expand Up @@ -233,6 +242,7 @@ mod tests {
vec![test_expr],
1,
MultiMap::new(),
0.0,
));
let mut project = project.execute();

Expand Down Expand Up @@ -296,6 +306,7 @@ mod tests {
vec![a_expr, b_expr],
1,
MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
0.0,
));
let mut project = project.execute();

Expand Down
8 changes: 7 additions & 1 deletion src/stream/src/from_proto/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use multimap::MultiMap;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::expr::expr_node;
use risingwave_pb::stream_plan::ProjectNode;

use super::*;
Expand Down Expand Up @@ -49,14 +50,19 @@ impl ExecutorBuilder for ProjectExecutorBuilder {
.map(|key| *key as usize),
),
);

let extremely_light = node.get_select_list().iter().all(|expr| {
let expr_type = expr.get_expr_type().unwrap();
expr_type == expr_node::Type::InputRef || expr_type == expr_node::Type::ConstantValue
});
let materialize_selectivity_threshold = if extremely_light { 0.0 } else { 0.5 };
Ok(ProjectExecutor::new(
params.actor_context,
input,
params.pk_indices,
project_exprs,
params.executor_id,
watermark_derivations,
materialize_selectivity_threshold,
)
.boxed())
}
Expand Down