Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 24 additions & 6 deletions rust/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,25 @@ impl dyn PlanNode {

/// Serialize the plan node and its children to a batch plan proto.
pub fn to_batch_prost(&self) -> BatchPlanProst {
self.to_batch_prost_identity(true)
}

/// Serialize the plan node and its children to a batch plan proto without the identity field
/// (for testing).
pub fn to_batch_prost_identity(&self, identity: bool) -> BatchPlanProst {
let node_body = Some(self.to_batch_prost_body());
let children = self
.inputs()
.into_iter()
.map(|plan| plan.to_batch_prost())
.map(|plan| plan.to_batch_prost_identity(identity))
.collect();
let identity = format!("{:?}", self);
BatchPlanProst {
children,
identity,
identity: if identity {
format!("{:?}", self)
} else {
"".into()
},
node_body,
}
}
Expand All @@ -110,6 +119,12 @@ impl dyn PlanNode {
/// Note that [`StreamTableScan`] has its own implementation of `to_stream_prost`. We have a
/// hook inside to do some ad-hoc thing for [`StreamTableScan`].
pub fn to_stream_prost(&self) -> StreamPlanProst {
self.to_stream_prost_identity(true)
}

/// Serialize the plan node and its children to a stream plan proto without identity (for
/// testing).
pub fn to_stream_prost_identity(&self, identity: bool) -> StreamPlanProst {
if let Some(stream_scan) = self.as_stream_table_scan() {
return stream_scan.adhoc_to_stream_prost();
}
Expand All @@ -118,13 +133,16 @@ impl dyn PlanNode {
let input = self
.inputs()
.into_iter()
.map(|plan| plan.to_stream_prost())
.map(|plan| plan.to_stream_prost_identity(identity))
.collect();
let identity = format!("{:?}", self);
// TODO: support pk_indices and operator_id
StreamPlanProst {
input,
identity,
identity: if identity {
format!("{:?}", self)
} else {
"".into()
},
node,
operator_id: 0,
pk_indices: vec![],
Expand Down
1 change: 1 addition & 0 deletions rust/frontend/test_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ futures = { version = "0.3", default-features = false, features = ["alloc"] }
risingwave_frontend = { path = ".." }
risingwave_sqlparser = { path = "../../sqlparser" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_with = "1"
serde_yaml = "0.8"
tokio = { version = "1", features = [
Expand Down
86 changes: 80 additions & 6 deletions rust/frontend/test_runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,66 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct TestCase {
/// The SQL statements
pub sql: String,

/// The original logical plan
pub logical_plan: Option<String>,

/// Logical plan with optimization `.gen_optimized_logical_plan()`
pub optimized_logical_plan: Option<String>,

/// Distributed batch plan `.gen_dist_batch_query_plan()`
pub batch_plan: Option<String>,

/// Proto JSON of generated batch plan
pub batch_plan_proto: Option<String>,

/// Create MV plan `.gen_create_mv_plan()`
pub stream_plan: Option<String>,

/// Proto JSON of generated stream plan
pub stream_plan_proto: Option<String>,

/// Error of binder
pub binder_error: Option<String>,

/// Error of planner
pub planner_error: Option<String>,

/// Error of optimizer
pub optimizer_error: Option<String>,
}

#[serde_with::skip_serializing_none]
#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
#[serde(deny_unknown_fields)]
pub struct TestCaseResult {
/// The original logical plan
pub logical_plan: Option<String>,

/// Logical plan with optimization `.gen_optimized_logical_plan()`
pub optimized_logical_plan: Option<String>,

/// Distributed batch plan `.gen_dist_batch_query_plan()`
pub batch_plan: Option<String>,

/// Proto JSON of generated batch plan
pub batch_plan_proto: Option<String>,

/// Create MV plan `.gen_create_mv_plan()`
pub stream_plan: Option<String>,

/// Proto JSON of generated stream plan
pub stream_plan_proto: Option<String>,

/// Error of binder
pub binder_error: Option<String>,

/// Error of planner
pub planner_error: Option<String>,

/// Error of optimizer
pub optimizer_error: Option<String>,
}

Expand All @@ -67,6 +107,8 @@ impl TestCaseResult {
optimized_logical_plan: self.optimized_logical_plan,
batch_plan: self.batch_plan,
stream_plan: self.stream_plan,
stream_plan_proto: self.stream_plan_proto,
batch_plan_proto: self.batch_plan_proto,
planner_error: self.planner_error,
optimizer_error: self.optimizer_error,
binder_error: self.binder_error,
Expand Down Expand Up @@ -146,14 +188,36 @@ impl TestCase {
Some(explain_plan(&logical_plan.gen_optimized_logical_plan()));
}

// Only generate batch_plan if it is specified in test case
if self.batch_plan.is_some() {
ret.batch_plan = Some(explain_plan(&logical_plan.gen_dist_batch_query_plan()));
if self.batch_plan.is_some() || self.batch_plan_proto.is_some() {
let batch_plan = logical_plan.gen_dist_batch_query_plan();

// Only generate batch_plan if it is specified in test case
if self.batch_plan.is_some() {
ret.batch_plan = Some(explain_plan(&batch_plan));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the proto of batch plan contains the information of exchange source (node IP & port), so it's not possible to test it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always run it with our own test runner, so we can get a consistent result by manually setting the IPs when creating the mock compute nodes...

}

// Only generate batch_plan_proto if it is specified in test case
if self.batch_plan_proto.is_some() {
ret.batch_plan_proto = Some(serde_json::to_string_pretty(
&batch_plan.to_batch_prost_identity(false),
)?);
}
}

// Only generate stream_plan if it is specified in test case
if self.stream_plan.is_some() {
ret.stream_plan = Some(explain_plan(&logical_plan.gen_create_mv_plan()));
if self.stream_plan.is_some() || self.stream_plan_proto.is_some() {
let stream_plan = logical_plan.gen_create_mv_plan();

// Only generate stream_plan if it is specified in test case
if self.stream_plan.is_some() {
ret.stream_plan = Some(explain_plan(&stream_plan));
}

// Only generate stream_plan_proto if it is specified in test case
if self.stream_plan_proto.is_some() {
ret.stream_plan_proto = Some(serde_json::to_string_pretty(
&stream_plan.to_stream_prost_identity(false),
)?);
}
}

Ok(ret)
Expand All @@ -180,6 +244,16 @@ fn check_result(expected: &TestCase, actual: &TestCaseResult) -> Result<()> {
)?;
check_option_plan_eq("batch_plan", &expected.batch_plan, &actual.batch_plan)?;
check_option_plan_eq("stream_plan", &expected.stream_plan, &actual.stream_plan)?;
check_option_plan_eq(
"stream_plan_proto",
&expected.stream_plan_proto,
&actual.stream_plan_proto,
)?;
check_option_plan_eq(
"batch_plan_proto",
&expected.batch_plan_proto,
&actual.batch_plan_proto,
)?;

Ok(())
}
Expand Down
Loading