Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions e2e_test/batch/tpch/q21.slt.part
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
onlyif risingwave
statement ok
SET enable_new_subquery_planner TO on
SET enable_new_subquery_batch_planner TO on

query TI
select
Expand Down Expand Up @@ -48,4 +48,4 @@ Supplier#000000005 15

onlyif risingwave
statement ok
SET enable_new_subquery_planner TO off
SET enable_new_subquery_batch_planner TO off
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private BatchPlannerConfigurations() {}

@Config
public static final ConfigEntry<Boolean> ENABLE_NEW_SUBQUERY_PLANNER =
ConfigEntry.<Boolean>builder("enable_new_subquery_planner")
ConfigEntry.<Boolean>builder("enable_new_subquery_batch_planner")
.setOptional(true)
.withDefaultValue(false)
.withConverter(BOOLEAN_PARSER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,17 @@ public static Configuration load(InputStream in, Class<?>... classes) {

private static ImmutableMap<String, ConfigEntry<?>> loadConfigRegistry() {
// Load batch planner options.
List<ConfigEntry<?>> configEntries = loadConfigEntries(BatchPlannerConfigurations.class);
List<List<ConfigEntry<?>>> configEntriesList =
List.of(
loadConfigEntries(BatchPlannerConfigurations.class),
loadConfigEntries(StreamPlannerConfigurations.class));
var mapBuilder = new ImmutableMap.Builder<String, ConfigEntry<?>>();
configEntries.forEach(
entry -> {
mapBuilder.put(entry.getKey(), entry);
});
for (var configEntries : configEntriesList) {
configEntries.forEach(
entry -> {
mapBuilder.put(entry.getKey(), entry);
});
}
return mapBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.risingwave.common.config;

import static com.risingwave.common.config.Parsers.BOOLEAN_PARSER;

/** Manage all config entry that can be set up in stream planner. */
public class StreamPlannerConfigurations {
private StreamPlannerConfigurations() {}

/**
* The key of config entry should be set exactly the same as the parameter explicitly decalred in
* SQL. For example, SET enable_new_subquery_stream_planner to TRUE => the key of
* `ENABLE_NEW_SUBQUERY_PLANNER` config should be set to 'enable_new_subquery_stream_planner'.
*/
@Config
public static final ConfigEntry<Boolean> ENABLE_NEW_SUBQUERY_PLANNER =
ConfigEntry.<Boolean>builder("enable_new_subquery_stream_planner")
.setOptional(true)
.withDefaultValue(false)
.withConverter(BOOLEAN_PARSER)
.withDoc("Optimizer config to enable new subquery expand in stream planner")
.build();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.risingwave.planner.planner.batch;

import static com.risingwave.common.config.BatchPlannerConfigurations.ENABLE_NEW_SUBQUERY_PLANNER;
import static com.risingwave.planner.program.ChainedOptimizerProgram.OptimizerPhase;
import static com.risingwave.planner.program.ChainedOptimizerProgram.OptimizerPhase.DISTRIBUTED;
import static com.risingwave.planner.program.ChainedOptimizerProgram.OptimizerPhase.JOIN_REORDER;
Expand Down Expand Up @@ -56,7 +57,10 @@ public RelNode plan(
SqlNode ast,
ExecutionContext context,
BiFunction<ExecutionContext, RelCollation, OptimizerProgram> optimizerProgramProvider) {
SqlConverter sqlConverter = SqlConverter.builder(context).build();
SqlConverter sqlConverter =
SqlConverter.builder(context)
.withExpand(!context.getSessionConfiguration().get(ENABLE_NEW_SUBQUERY_PLANNER))
.build();
RelRoot rawRoot = sqlConverter.toRel(ast);
OptimizerProgram optimizerProgram = optimizerProgramProvider.apply(context, rawRoot.collation);
RelNode optimized = optimizerProgram.optimize(rawRoot.rel, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,10 @@

import com.google.common.collect.ImmutableList;
import com.risingwave.catalog.*;
import com.risingwave.common.config.StreamPlannerConfigurations;
import com.risingwave.execution.context.ExecutionContext;
import com.risingwave.planner.planner.Planner;
import com.risingwave.planner.program.ChainedOptimizerProgram;
import com.risingwave.planner.program.HepOptimizerProgram;
import com.risingwave.planner.program.JoinReorderProgram;
import com.risingwave.planner.program.OptimizerProgram;
import com.risingwave.planner.program.SubQueryRewriteProgram;
import com.risingwave.planner.program.VolcanoOptimizerProgram;
import com.risingwave.planner.program.*;
import com.risingwave.planner.rel.serialization.ExplainWriter;
import com.risingwave.planner.rel.streaming.*;
import com.risingwave.planner.rules.physical.BatchRuleSets;
Expand Down Expand Up @@ -62,10 +58,16 @@ public StreamPlanner(boolean testMode) {
@Override
public StreamingPlan plan(SqlNode ast, ExecutionContext context) {
SqlCreateMaterializedView create = (SqlCreateMaterializedView) ast;
SqlConverter sqlConverter = SqlConverter.builder(context).build();
SqlConverter sqlConverter =
SqlConverter.builder(context)
.withExpand(
!context
.getSessionConfiguration()
.get(StreamPlannerConfigurations.ENABLE_NEW_SUBQUERY_PLANNER))
.build();
RelNode rawPlan = sqlConverter.toRel(create.query).rel;
// Logical optimization.
OptimizerProgram optimizerProgram = buildLogicalOptimizerProgram();
OptimizerProgram optimizerProgram = buildLogicalOptimizerProgram(context);
RelNode logicalPlan = optimizerProgram.optimize(rawPlan, context);
log.debug("Logical plan: \n" + ExplainWriter.explainPlan(logicalPlan));
// Generate Streaming plan from logical plan.
Expand Down Expand Up @@ -218,10 +220,18 @@ private RwStreamMaterialize addMaterializedViewNode(
}
}

private OptimizerProgram buildLogicalOptimizerProgram() {
private OptimizerProgram buildLogicalOptimizerProgram(ExecutionContext context) {
ChainedOptimizerProgram.Builder builder = ChainedOptimizerProgram.builder(STREAMING);
// We use partial rules from batch planner until getting a RisingWave logical plan.
builder.addLast(SUBQUERY_REWRITE, SubQueryRewriteProgram.INSTANCE);
var useNewSubqueryPlanner =
context
.getSessionConfiguration()
.get(StreamPlannerConfigurations.ENABLE_NEW_SUBQUERY_PLANNER);
if (useNewSubqueryPlanner) {
builder.addLast(SUBQUERY_REWRITE, new SubQueryRewriteProgram2());
} else {
builder.addLast(SUBQUERY_REWRITE, SubQueryRewriteProgram.INSTANCE);
}

builder.addLast(
LOGICAL_REWRITE, HepOptimizerProgram.builder().addRules(LOGICAL_REWRITE_RULES).build());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.risingwave.planner.sql;

import static com.risingwave.common.config.BatchPlannerConfigurations.ENABLE_NEW_SUBQUERY_PLANNER;
import static java.util.Objects.requireNonNull;

import com.risingwave.common.datatype.RisingWaveTypeFactory;
Expand Down Expand Up @@ -59,6 +58,8 @@ public static class Builder {
private VolcanoPlanner planner = null;
private RelOptCluster cluster = null;

private boolean withExpand = true;

private Builder(ExecutionContext context) {
this.context = context;
this.rootSchema = context.getCalciteRootSchema();
Expand All @@ -69,6 +70,11 @@ public Builder withDefaultSchema(List<String> newDefaultSchema) {
return this;
}

public Builder withExpand(boolean doExpand) {
withExpand = doExpand;
return this;
}

public SqlConverter build() {

RisingWaveCalciteCatalogReader catalogReader =
Expand All @@ -85,7 +91,7 @@ public SqlConverter build() {
this.config =
this.config
.addRelBuilderConfigTransform(c -> c.withSimplify(false))
.withExpand(!context.getSessionConfiguration().get(ENABLE_NEW_SUBQUERY_PLANNER));
.withExpand(this.withExpand);

SqlToRelConverter sql2RelConverter =
new RisingWaveSqlToRelConverter(
Expand Down