Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d6da71f
temp commit
wcy-fdu Oct 25, 2022
370987a
framework
wcy-fdu Oct 25, 2022
dfbd7dc
temp_commit
wcy-fdu Nov 7, 2022
ddb7855
temp_commit
wcy-fdu Nov 7, 2022
9c958ab
temp commit
wcy-fdu Nov 7, 2022
ca0ec5e
framework done
wcy-fdu Nov 8, 2022
9d616dd
temp commit
wcy-fdu Nov 8, 2022
25dc53f
bug need to fix
wcy-fdu Nov 9, 2022
8a1d8cb
need to add ut
wcy-fdu Nov 9, 2022
9f25606
merge main and resolve conflict
wcy-fdu Nov 10, 2022
77845f6
ensure correctness
wcy-fdu Nov 10, 2022
7daeae0
refactor
wcy-fdu Nov 10, 2022
f8d18b0
todo: add more unit test
wcy-fdu Nov 10, 2022
8e77946
todo: add comments, clean up code
wcy-fdu Nov 11, 2022
d9dc654
todo: clean up code
wcy-fdu Nov 11, 2022
5855c5f
typo fix
wcy-fdu Nov 11, 2022
cebc11a
typo fix
wcy-fdu Nov 11, 2022
75692a2
refactor code
wcy-fdu Nov 11, 2022
37c2167
ready for review
wcy-fdu Nov 11, 2022
e317efa
code refactor
wcy-fdu Nov 11, 2022
d7fcc2e
add e2e and ut
wcy-fdu Nov 14, 2022
3ec6b84
ready for review
wcy-fdu Nov 14, 2022
21083a6
dashboard
yuhao-su Nov 14, 2022
c28b177
change cache row to compacted_row and add get_compacted_row for state…
wcy-fdu Nov 15, 2022
2cbb8b9
change cache row to compacted_row and add get_compacted_row for state…
wcy-fdu Nov 15, 2022
c339e53
materialize cache do not store vnode in key
wcy-fdu Nov 15, 2022
fd3974c
clean up state table get_row
wcy-fdu Nov 15, 2022
58d2737
materialize source handle pk conflict
wcy-fdu Nov 15, 2022
8cd1bcf
Merge branch 'main' of https://github.com/singularity-data/risingwave…
wcy-fdu Nov 15, 2022
716069e
merge main
wcy-fdu Nov 15, 2022
4c307e2
merge main
wcy-fdu Nov 15, 2022
24aa73e
mv check concurrent read
st1page Nov 15, 2022
6f99ede
improve panic msg
st1page Nov 15, 2022
f63674d
fix
wcy-fdu Nov 16, 2022
2ccc42e
merge main
wcy-fdu Nov 16, 2022
b4e34bb
refine the code
st1page Nov 16, 2022
8ce9519
clippy
st1page Nov 16, 2022
fd74a70
Delete package-lock.json
st1page Nov 16, 2022
2c3b44f
Delete package.json
st1page Nov 16, 2022
590c556
Merge branch 'main' into wcy/add_cache_for_materialize
mergify[bot] Nov 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dashboard/proto/gen/catalog.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ message Table {
// `[0..columns.len()]`.
repeated int32 value_indices = 19;
string definition = 20;
bool handle_pk_conflict = 21;
}

message View {
Expand Down
4 changes: 2 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ message MaterializeNode {
// Used for internal table states.
catalog.Table table = 3;
// Used to control whether doing sanity check, open it when upstream executor is source executor.
bool ignore_on_conflict = 4;
bool handle_pk_conflict = 4;
}

message AggCallState {
Expand Down Expand Up @@ -349,7 +349,7 @@ message ArrangeNode {
// Used for internal table states.
catalog.Table table = 3;
// Used to control whether doing sanity check, open it when upstream executor is source executor.
bool ignore_on_conflict = 4;
bool handle_pk_conflict = 4;
}

// Special node for shared state. LookupNode will join an arrangement with a stream.
Expand Down
3 changes: 3 additions & 0 deletions src/compute/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ async fn test_table_materialize() -> StreamResult<()> {
vec![OrderPair::new(0, OrderType::Ascending)],
all_column_ids.clone(),
2,
None,
0,
false,
)
.await
.boxed()
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn make_state_table<S: StateStore>(hummock: S, table: &TableCatalog) -
table.pk().iter().map(|x| x.direct.to_order()).collect(),
table.pk().iter().map(|x| x.index).collect(),
Distribution::all_vnodes(table.distribution_key().to_vec()), // scan all vnodes
table.value_indices.clone(),
Some(table.value_indices.clone()),
)
.await
}
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub struct TableCatalog {

/// Definition of the materialized view.
pub definition: String,

pub handle_pk_conflict: bool,
}

impl TableCatalog {
Expand All @@ -109,6 +111,10 @@ impl TableCatalog {
self
}

pub fn handle_pk_conflict(&self) -> bool {
self.handle_pk_conflict
}

/// Get the table catalog's associated source id.
#[must_use]
pub fn associated_source_id(&self) -> Option<TableId> {
Expand Down Expand Up @@ -190,6 +196,7 @@ impl TableCatalog {
.map(|i| ProstColumnIndex { index: i as _ }),
value_indices: self.value_indices.iter().map(|x| *x as _).collect(),
definition: self.definition.clone(),
handle_pk_conflict: self.handle_pk_conflict,
}
}
}
Expand Down Expand Up @@ -236,6 +243,7 @@ impl From<ProstTable> for TableCatalog {
vnode_col_idx: tb.vnode_col_idx.map(|x| x.index as usize),
value_indices: tb.value_indices.iter().map(|x| *x as _).collect(),
definition: tb.definition.clone(),
handle_pk_conflict: tb.handle_pk_conflict,
}
}
}
Expand Down Expand Up @@ -320,6 +328,7 @@ mod tests {
vnode_col_idx: None,
value_indices: vec![0],
definition: "".into(),
handle_pk_conflict: false,
}
.into();

Expand Down Expand Up @@ -377,6 +386,7 @@ mod tests {
vnode_col_idx: None,
value_indices: vec![0],
definition: "".into(),
handle_pk_conflict: false
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub fn gen_create_mv_plan(
.into());
}
}
let materialize = plan_root.gen_create_mv_plan(table_name, definition, col_names)?;
let materialize = plan_root.gen_create_mv_plan(table_name, definition, col_names, false)?;
let mut table = materialize.table().to_prost(schema_id, database_id);
if session.config().get_create_compaction_group_for_mv() {
table.properties.insert(
Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ pub(crate) fn gen_materialized_source_plan(
// Manually assemble the materialization plan for the table.
let source_node: PlanRef = LogicalSource::new(Rc::new((&source).into()), context).into();
let row_id_index = source.row_id_index.as_ref().map(|index| index.index as _);
// row_id_index is Some means that the user has not specified pk, then we will add a hidden
// column to store pk, and materialize executor do not need to handle pk conflict.
let handle_pk_conflict = row_id_index.is_none();
let mut required_cols = FixedBitSet::with_capacity(source_node.schema().len());
required_cols.toggle_range(..);
let mut out_names = source_node.schema().names();
Expand All @@ -247,7 +250,7 @@ pub(crate) fn gen_materialized_source_plan(
required_cols,
out_names,
)
.gen_create_mv_plan(source.name.clone(), "".into(), None)?
.gen_create_mv_plan(source.name.clone(), "".into(), None, handle_pk_conflict)?
};
let mut table = materialize
.table()
Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ impl PlanRoot {
mv_name: String,
definition: String,
col_names: Option<Vec<String>>,
handle_pk_conflict: bool,
) -> Result<StreamMaterialize> {
let out_names = if let Some(col_names) = col_names {
col_names
Expand All @@ -498,6 +499,7 @@ impl PlanRoot {
out_names,
false,
definition,
handle_pk_conflict,
)
}

Expand All @@ -513,6 +515,7 @@ impl PlanRoot {
self.out_names.clone(),
true,
"".into(),
false,
)
}

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/optimizer/plan_node/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ pub fn to_stream_prost_body(
table_id: 0,
column_orders: me.table.pk().iter().map(FieldOrder::to_protobuf).collect(),
table: Some(me.table.to_internal_table_prost()),
ignore_on_conflict: true,
handle_pk_conflict: false,
})
}
Node::ProjectSet(me) => {
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ impl StreamMaterialize {
out_names: Vec<String>,
is_index: bool,
definition: String,
handle_pk_conflict: bool,
) -> Result<Self> {
let required_dist = match input.distribution() {
Distribution::Single => RequiredDist::single(),
Expand Down Expand Up @@ -167,7 +168,6 @@ impl StreamMaterialize {

let ctx = input.ctx();
let properties = ctx.inner().with_options.internal_table_subset();

let table = TableCatalog {
id: TableId::placeholder(),
associated_source_id: None,
Expand All @@ -185,6 +185,7 @@ impl StreamMaterialize {
vnode_col_idx: None,
value_indices,
definition,
handle_pk_conflict,
};

Ok(Self { base, input, table })
Expand Down Expand Up @@ -265,7 +266,7 @@ impl StreamNode for StreamMaterialize {
.map(FieldOrder::to_protobuf)
.collect(),
table: Some(self.table().to_internal_table_prost()),
ignore_on_conflict: true,
handle_pk_conflict: self.table.handle_pk_conflict(),
})
}
}
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl TableCatalogBuilder {
.value_indices
.unwrap_or_else(|| (0..self.columns.len()).collect_vec()),
definition: "".into(),
handle_pk_conflict: false,
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/test_fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ fn make_stream_fragments() -> Vec<StreamFragment> {
table_id: 1,
table: Some(make_internal_table(4, true)),
column_orders: vec![make_column_order(1), make_column_order(2)],
ignore_on_conflict: true,
handle_pk_conflict: false,
})),
fields: vec![], // TODO: fill this later
operator_id: 7,
Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/src/filter_key_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ mod tests {
vnode_col_idx: None,
value_indices: vec![0],
definition: "".into(),
handle_pk_conflict: false,
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ fn compute_vnode(row: &Row, indices: &[usize], vnodes: &Bitmap) -> VirtualNode {
}

/// Get vnode values with `indices` on the given `chunk`.
fn compute_chunk_vnode(chunk: &DataChunk, indices: &[usize], vnodes: &Bitmap) -> Vec<VirtualNode> {
pub fn compute_chunk_vnode(
chunk: &DataChunk,
indices: &[usize],
vnodes: &Bitmap,
) -> Vec<VirtualNode> {
if indices.is_empty() {
vec![DEFAULT_VNODE; chunk.capacity()]
} else {
Expand Down
Loading