Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions e2e_test/streaming/union.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t1 (v1 int, v2 int);

statement ok
create table t2 (v1 int, v3 int);

statement ok
create materialized view v as select * from t1 union all select * from t2;

query II
select * from v;
----

statement ok
insert into t1 values(1, 2);

query II
select * from v;
----
1 2

statement ok
insert into t2 values(1, 2);


query II
select * from v;
----
1 2
1 2

statement ok
delete from t1 where v1 = 1;

query II
select * from v;
----
1 2

statement ok
delete from t2 where v1 = 1;

query II
select * from v;
----

statement ok
drop materialized view v;

statement ok
drop table t1;

statement ok
drop table t2;
30 changes: 30 additions & 0 deletions src/frontend/planner_test/tests/testdata/union.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,36 @@
| └─BatchScan { table: t1, columns: [t1.a, t1.b, t1.c], distribution: SomeShard }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t2, columns: [t2.a, t2.b, t2.c], distribution: SomeShard }
stream_plan: |
StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Int64(hidden), 0:Int32(hidden)], pk_columns: [t1._row_id, null:Int64, 0:Int32] }
└─StreamUnion { all: true }
├─StreamExchange { dist: Single }
| └─StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Int64, 0:Int32] }
| └─StreamTableScan { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int64, t2._row_id, 1:Int32] }
└─StreamTableScan { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
stream_dist_plan: |
Fragment 0
StreamMaterialize { columns: [a, b, c, t1._row_id(hidden), null:Int64(hidden), 0:Int32(hidden)], pk_columns: [t1._row_id, null:Int64, 0:Int32] }
materialized table: 4294967294
StreamUnion { all: true }
StreamExchange Single from 1
StreamExchange Single from 2

Fragment 1
StreamProject { exprs: [t1.a, t1.b, t1.c, t1._row_id, null:Int64, 0:Int32] }
Chain { table: t1, columns: [t1.a, t1.b, t1.c, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
Upstream
BatchPlanNode

Fragment 2
StreamProject { exprs: [t2.a, t2.b, t2.c, null:Int64, t2._row_id, 1:Int32] }
Chain { table: t2, columns: [t2.a, t2.b, t2.c, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
Upstream
BatchPlanNode

Table 4294967294 { columns: [a, b, c, t1._row_id, null:Int64, 0:Int32], primary key: [$3 ASC, $4 ASC, $5 ASC], value indices: [0, 1, 2, 3, 4, 5], distribution key: [] }
- sql: |
create table t1 (a int, b numeric, c bigint);
create table t2 (a int, b numeric, c bigint);
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/batch_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl PlanTreeNode for BatchUnion {
}

fn clone_with_inputs(&self, inputs: &[crate::optimizer::PlanRef]) -> PlanRef {
// For batch query, we don't need to clone `source_col`, so just use new.
Self::new(LogicalUnion::new(self.logical.all(), inputs.to_owned())).into()
}
}
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,3 +1111,15 @@ impl<PlanRef: GenericPlanRef> Project<PlanRef> {
self.o2i_col_mapping().inverse()
}
}

/// `Union` returns the union of the rows of its inputs.
/// If `all` is false, it needs to eliminate duplicates.
#[derive(Debug, Clone)]
pub struct Union<PlanRef> {
pub all: bool,
pub inputs: Vec<PlanRef>,
/// It is used by streaming processing. We need to use `source_col` to identify the record came
/// from which source input.
/// We add it as a logical property, because we need to derive the logical pk based on it.
pub source_col: Option<usize>,
}
26 changes: 26 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic_derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,29 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for Expand<PlanRef> {
self.input.ctx()
}
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Union<PlanRef> {
fn schema(&self) -> Schema {
self.inputs[0].schema().clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the hidden column here?

Copy link
Contributor Author

@chenzl25 chenzl25 Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not. The input schema already contains the hidden column and we can find them by source_col.

}

fn logical_pk(&self) -> Option<Vec<usize>> {
// Union all its inputs pks + source_col if exists
let mut pk_indices = vec![];
for input in &self.inputs {
for pk in input.logical_pk() {
if !pk_indices.contains(pk) {
pk_indices.push(*pk);
}
}
}
if let Some(source_col) = self.source_col {
pk_indices.push(source_col)
}
Some(pk_indices)
}

fn ctx(&self) -> OptimizerContextRef {
self.inputs[0].ctx()
}
}
Loading