Skip to content
Merged
22 changes: 5 additions & 17 deletions rust/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,26 @@ use std::fmt;

use risingwave_common::catalog::Schema;

use super::{PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use super::{BatchBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithDistribution, WithOrder, WithSchema};

/// `BatchExchange` imposes a particular distribution on its input
/// without changing its content.
#[derive(Debug, Clone)]
pub struct BatchExchange {
order: Order,
pub base: BatchBase,
input: PlanRef,
schema: Schema,
dist: Distribution,
}

impl BatchExchange {
pub fn new(input: PlanRef, order: Order, dist: Distribution) -> Self {
let schema = input.schema().clone();
let base = BatchBase { order, dist };
BatchExchange {
input,
order,
schema,
dist,
base,
}
}
}
Expand All @@ -38,21 +37,10 @@ impl PlanTreeNodeUnary for BatchExchange {
self.input.clone()
}
fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(input, self.order.clone(), self.distribution().clone())
Self::new(input, self.order().clone(), self.distribution().clone())
}
}
impl_plan_tree_node_for_unary! {BatchExchange}
impl WithOrder for BatchExchange {
fn order(&self) -> &Order {
&self.order
}
}

impl WithDistribution for BatchExchange {
fn distribution(&self) -> &Distribution {
&self.dist
}
}

impl WithSchema for BatchExchange {
fn schema(&self) -> &Schema {
Expand Down
17 changes: 11 additions & 6 deletions rust/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,31 @@ use std::fmt;
use risingwave_common::catalog::Schema;

use super::{
EqJoinPredicate, LogicalJoin, PlanRef, PlanTreeNodeBinary, ToBatchProst, ToDistributedBatch,
BatchBase, EqJoinPredicate, LogicalJoin, PlanRef, PlanTreeNodeBinary, ToBatchProst,
ToDistributedBatch,
};
use crate::optimizer::property::{Distribution, WithDistribution, WithOrder, WithSchema};
use crate::optimizer::property::{Distribution, Order, WithSchema};

/// `BatchHashJoin` implements [`super::LogicalJoin`] with hash table. It builds a hash table
/// from inner (right-side) relation and then probes with data from outer (left-side) relation to
/// get output rows.
#[derive(Debug, Clone)]
pub struct BatchHashJoin {
pub base: BatchBase,
logical: LogicalJoin,
eq_join_predicate: EqJoinPredicate,
}

impl BatchHashJoin {
pub fn new(logical: LogicalJoin, eq_join_predicate: EqJoinPredicate) -> Self {
// TODO: derive from input
let base = BatchBase {
order: Order::any().clone(),
dist: Distribution::any().clone(),
};

Self {
base,
logical,
eq_join_predicate,
}
Expand Down Expand Up @@ -58,10 +67,6 @@ impl PlanTreeNodeBinary for BatchHashJoin {
}
impl_plan_tree_node_for_binary! {BatchHashJoin}

impl WithOrder for BatchHashJoin {}

impl WithDistribution for BatchHashJoin {}

impl WithSchema for BatchHashJoin {
fn schema(&self) -> &Schema {
self.logical.schema()
Expand Down
16 changes: 10 additions & 6 deletions rust/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@ use std::fmt;

use risingwave_common::catalog::Schema;

use super::{LogicalLimit, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, WithDistribution, WithOrder, WithSchema};
use super::{
BatchBase, LogicalLimit, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch,
};
use crate::optimizer::property::{Distribution, WithSchema};

/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
#[derive(Debug, Clone)]
pub struct BatchLimit {
pub base: BatchBase,
logical: LogicalLimit,
}

impl BatchLimit {
pub fn new(logical: LogicalLimit) -> Self {
BatchLimit { logical }
let base = BatchBase {
order: logical.input().order().clone(),
dist: logical.input().distribution().clone(),
};
BatchLimit { logical, base }
}
}

Expand All @@ -32,9 +39,6 @@ impl PlanTreeNodeUnary for BatchLimit {
}
}
impl_plan_tree_node_for_unary! {BatchLimit}
impl WithOrder for BatchLimit {}

impl WithDistribution for BatchLimit {}

impl WithSchema for BatchLimit {
fn schema(&self) -> &Schema {
Expand Down
18 changes: 11 additions & 7 deletions rust/frontend/src/optimizer/plan_node/batch_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,27 @@ use std::fmt;

use risingwave_common::catalog::Schema;

use super::{LogicalProject, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, WithDistribution, WithOrder, WithSchema};
use super::{
BatchBase, LogicalProject, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch,
};
use crate::optimizer::property::{Distribution, Order, WithSchema};

/// `BatchProject` implements [`super::LogicalProject`] to evaluate specified expressions on input
/// rows
#[derive(Debug, Clone)]
pub struct BatchProject {
pub base: BatchBase,
logical: LogicalProject,
}

impl BatchProject {
pub fn new(logical: LogicalProject) -> Self {
BatchProject { logical }
// TODO: derive from input
let base = BatchBase {
order: Order::any().clone(),
dist: Distribution::any().clone(),
};
BatchProject { logical, base }
}
}

Expand All @@ -35,10 +43,6 @@ impl PlanTreeNodeUnary for BatchProject {

impl_plan_tree_node_for_unary! { BatchProject }

impl WithOrder for BatchProject {}

impl WithDistribution for BatchProject {}

impl WithSchema for BatchProject {
fn schema(&self) -> &Schema {
self.logical.schema()
Expand Down
19 changes: 11 additions & 8 deletions rust/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use std::fmt;

use risingwave_common::catalog::Schema;

use super::{PlanRef, ToBatchProst, ToDistributedBatch};
use super::{BatchBase, PlanRef, ToBatchProst, ToDistributedBatch};
use crate::optimizer::plan_node::LogicalScan;
use crate::optimizer::property::{WithDistribution, WithOrder, WithSchema};
use crate::optimizer::property::{Distribution, Order, WithSchema};

/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct BatchSeqScan {
pub base: BatchBase,
logical: LogicalScan,
}

Expand All @@ -20,7 +21,13 @@ impl WithSchema for BatchSeqScan {

impl BatchSeqScan {
pub fn new(logical: LogicalScan) -> Self {
Self { logical }
// TODO: derive from input
let base = BatchBase {
order: Order::any().clone(),
dist: Distribution::any().clone(),
};

Self { logical, base }
}
}

Expand All @@ -33,10 +40,6 @@ impl fmt::Display for BatchSeqScan {
}
}

impl WithOrder for BatchSeqScan {}

impl WithDistribution for BatchSeqScan {}

impl ToDistributedBatch for BatchSeqScan {
fn to_distributed(&self) -> PlanRef {
self.clone().into()
Expand Down
24 changes: 6 additions & 18 deletions rust/frontend/src/optimizer/plan_node/batch_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@ use std::fmt;

use risingwave_common::catalog::Schema;

use super::{PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithDistribution, WithOrder, WithSchema};
use super::{BatchBase, PlanRef, PlanTreeNodeUnary, ToBatchProst, ToDistributedBatch};
use crate::optimizer::property::{Distribution, Order, WithSchema};

/// `BatchSort` buffers all data from input and sort these rows by specified order, providing the
/// collation required by user or parent plan node.
#[derive(Debug, Clone)]
pub struct BatchSort {
order: Order,
pub base: BatchBase,
input: PlanRef,
schema: Schema,
dist: Distribution,
}

impl BatchSort {
pub fn new(input: PlanRef, order: Order) -> Self {
let schema = input.schema().clone();
let dist = input.distribution().clone();
let base = BatchBase { order, dist };
BatchSort {
input,
order,
base,
schema,
dist,
}
}
}
Expand All @@ -39,21 +38,10 @@ impl PlanTreeNodeUnary for BatchSort {
self.input.clone()
}
fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(input, self.order.clone())
Self::new(input, self.base.order.clone())
}
}
impl_plan_tree_node_for_unary! {BatchSort}
impl WithOrder for BatchSort {
fn order(&self) -> &Order {
&self.order
}
}

impl WithDistribution for BatchSort {
fn distribution(&self) -> &Distribution {
&self.dist
}
}

impl WithSchema for BatchSort {
fn schema(&self) -> &Schema {
Expand Down
25 changes: 11 additions & 14 deletions rust/frontend/src/optimizer/plan_node/batch_sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,32 @@ use std::fmt;
use risingwave_common::catalog::Schema;

use super::{
EqJoinPredicate, LogicalJoin, PlanRef, PlanTreeNodeBinary, ToBatchProst, ToDistributedBatch,
};
use crate::optimizer::property::{
Direction, Distribution, FieldOrder, Order, WithDistribution, WithOrder, WithSchema,
BatchBase, EqJoinPredicate, LogicalJoin, PlanRef, PlanTreeNodeBinary, ToBatchProst,
ToDistributedBatch,
};
use crate::optimizer::property::{Direction, Distribution, FieldOrder, Order, WithSchema};

/// `BatchSortMergeJoin` implements [`super::LogicalJoin`] by merging left & right relations in
/// a streaming manner. The input relation must have been ordered by the equi-join key(s).
#[derive(Debug, Clone)]
pub struct BatchSortMergeJoin {
pub base: BatchBase,
logical: LogicalJoin,
eq_join_predicate: EqJoinPredicate,
order: Order,
}

impl BatchSortMergeJoin {
pub fn new(logical: LogicalJoin, eq_join_predicate: EqJoinPredicate) -> Self {
let order = Self::derive_order(logical.left().order(), logical.right().order());
// TODO: derive from input
let base = BatchBase {
order,
dist: Distribution::any().clone(),
};

Self {
logical,
order,
base,
eq_join_predicate,
}
}
Expand Down Expand Up @@ -87,14 +92,6 @@ impl PlanTreeNodeBinary for BatchSortMergeJoin {
}
impl_plan_tree_node_for_binary! {BatchSortMergeJoin}

impl WithOrder for BatchSortMergeJoin {
fn order(&self) -> &Order {
&self.order
}
}

impl WithDistribution for BatchSortMergeJoin {}

impl WithSchema for BatchSortMergeJoin {
fn schema(&self) -> &Schema {
self.logical.schema()
Expand Down
19 changes: 5 additions & 14 deletions rust/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::expr::AggKind;
use risingwave_common::types::DataType;

use super::{ColPrunable, PlanRef, PlanTreeNodeUnary, ToBatch, ToStream};
use super::{ColPrunable, LogicalBase, PlanRef, PlanTreeNodeUnary, ToBatch, ToStream};
use crate::expr::ExprImpl;
use crate::optimizer::plan_node::LogicalProject;
use crate::optimizer::property::{WithDistribution, WithOrder, WithSchema};
use crate::optimizer::property::WithSchema;
use crate::utils::ColIndexMapping;

/// Aggregation Call
Expand All @@ -31,10 +31,10 @@ pub struct PlanAggCall {
/// functions in the `SELECT` clause.
#[derive(Clone, Debug)]
pub struct LogicalAgg {
pub base: LogicalBase,
agg_calls: Vec<PlanAggCall>,
agg_call_alias: Vec<Option<String>>,
group_keys: Vec<usize>,
schema: Schema,
input: PlanRef,
}

Expand All @@ -54,11 +54,12 @@ impl LogicalAgg {
.collect(),
&agg_call_alias,
);
let base = LogicalBase { schema };
Self {
agg_calls,
group_keys,
input,
schema,
base,
agg_call_alias,
}
}
Expand Down Expand Up @@ -138,16 +139,6 @@ impl fmt::Display for LogicalAgg {
}
}

impl WithOrder for LogicalAgg {}

impl WithDistribution for LogicalAgg {}

impl WithSchema for LogicalAgg {
fn schema(&self) -> &Schema {
&self.schema
}
}

impl ColPrunable for LogicalAgg {
fn prune_col(&self, required_cols: &FixedBitSet) -> PlanRef {
self.must_contain_columns(required_cols);
Expand Down
Loading