-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Improve performance of Merge for MarkDistinct #26759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
a881834
to
2f5848e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure we have tests for all the involved paths:
- all source rows unmatched
- a mix of matched and unmatched rows
Same cases with failure from duplicate match
core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java
Outdated
Show resolved
Hide resolved
@kasiafi Thanks for your feedback.
The logical plan (Trino version 443) is:
In Fragment 2,
My current implementation is such that if a Right Join does not match any rows, it uses the I'm not sure if this implementation approach is feasible, or whether there might be a better way to implement it.If this solution is feasible, I will proceed with the work. |
Hi @hqbhoho The problematic case is when unique ids from target get mixed with unique ids from source, and they accidentally clash -- so we have a false duplicate. Unique ids from target nad source get mixed when there is partial match: some source rows match to target rows, and some are unmatched. To disambiguate the target and source unique ids, we propose a change to |
@kasiafi So It will check rows which is matched, in other words, the same match condition cannot be used for an I think after right join, the Only |
@hqbhoho thanks for the explanation. you're right. The case number is also considered, and MERGE cases cannot overlap between the matched and unmatched rows. Your solution is thus correct. Feel free to proceed with your work. |
0668612
to
fd7cb0b
Compare
fd7cb0b
to
ba16252
Compare
Reviewer's GuideEnhance Merge planner to assign and coalesce unique IDs for both target and source rows, improving parallelism in mark distinct operations and updating projection, join, and filter logic accordingly, with new test coverage to validate the plan. Sequence diagram for unique ID assignment and coalescing in Merge planningsequenceDiagram
participant Planner
participant TargetTable
participant SourceTable
participant Join
participant Project
participant MarkDistinct
Planner->>TargetTable: Assign unique_id (targetUniqueIdSymbol)
Planner->>SourceTable: Assign unique_id (sourceUniqueIdSymbol)
Planner->>Join: Join TargetTable and SourceTable
Join->>Project: Project with Coalesce(targetUniqueIdSymbol, sourceUniqueIdSymbol) as uniqueIdSymbol
Project->>MarkDistinct: Mark distinct on [uniqueIdSymbol, caseNumberSymbol]
Class diagram for updated unique ID assignment in Merge planningclassDiagram
class QueryPlanner {
+plan(Merge merge)
}
class AssignUniqueId {
+AssignUniqueId(id, root, symbol)
}
class RelationPlan {
+RelationPlan(root, scope, fieldMappings, outerContext)
}
class ProjectNode {
+ProjectNode(id, root, assignments)
}
class MarkDistinctNode {
+MarkDistinctNode(id, source, isDistinctSymbol, distinctSymbols)
}
QueryPlanner --> AssignUniqueId : uses
QueryPlanner --> RelationPlan : creates
QueryPlanner --> ProjectNode : creates
QueryPlanner --> MarkDistinctNode : creates
AssignUniqueId <|-- RelationPlan : composition
ProjectNode <|-- RelationPlan : composition
MarkDistinctNode <|-- ProjectNode : composition
QueryPlanner : -targetUniqueIdSymbol
QueryPlanner : -sourceUniqueIdSymbol
QueryPlanner : -uniqueIdSymbol
QueryPlanner : -Coalesce(targetUniqueIdSymbol, sourceUniqueIdSymbol)
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `core/trino-main/src/test/java/io/trino/sql/planner/TestMerge.java:102-103` </location>
<code_context>
+ return planTester;
+ }
+
+ @Test
+ public void testMergeWithSimpleSelect()
+ {
+ // one join
</code_context>
<issue_to_address>
**suggestion (testing):** Missing tests for edge cases such as duplicate source rows, null values, and empty tables.
Please add tests for duplicate source rows, nulls in join columns, empty tables, and non-matching source rows to ensure the unique_id logic and MarkDistinct behavior are robust.
</issue_to_address>
### Comment 2
<location> `core/trino-main/src/test/java/io/trino/sql/planner/TestMerge.java:103-106` </location>
<code_context>
+ }
+
+ @Test
+ public void testMergeWithSimpleSelect()
+ {
+ // one join
+ assertPlan(
+ "MERGE INTO test_table_merge_target a " +
+ "USING test_table_merge_source b " +
</code_context>
<issue_to_address>
**suggestion (testing):** Test assertions focus on plan structure but do not verify runtime behavior or error conditions.
Please add tests that confirm expected errors are raised and that parallelism is achieved, such as by verifying task distribution or output row counts during execution.
Suggested implementation:
```java
@Test
public void testMergeWithSimpleSelect()
{
// one join
assertPlan(
"MERGE INTO test_table_merge_target a " +
"USING test_table_merge_source b " +
"ON a.column1 = b.column1 " +
"WHEN MATCHED " +
"THEN UPDATE SET column2 = b.column2 " +
"WHEN NOT MATCHED " +
"THEN INSERT (column1 ,column2) VALUES (b.column1, b.column2)",
anyTree(
filter(
new Case(
ImmutableList.of(new WhenClause(new Call(NOT, ImmutableList.of(new Reference(BOOLEAN, "is_distinct"))), new Cast(new Call(FAIL, ImmutableList.of(new Constant(INTEGER, (long) MERGE_TARGET_ROW_MULTIPLE_MATCHES.toErrorCode().getCode()), new Constant(VARCHAR, utf8Slice("One MERGE target table row matched more than one source row")))), BOOLEAN))),
TRUE),
);
}
@Test
public void testMergeRaisesErrorOnMultipleMatches()
{
// Simulate a MERGE that should fail due to multiple matches
try {
computeActual(
"MERGE INTO test_table_merge_target a " +
"USING (SELECT column1, column2 FROM test_table_merge_source UNION ALL SELECT column1, column2 FROM test_table_merge_source) b " +
"ON a.column1 = b.column1 " +
"WHEN MATCHED THEN UPDATE SET column2 = b.column2"
);
fail("Expected exception due to multiple matches");
}
catch (RuntimeException e) {
assertTrue(e.getMessage().contains("One MERGE target table row matched more than one source row"));
}
}
@Test
public void testMergeParallelismAndOutputRowCount()
{
// Simulate a MERGE and check that output row count matches expectation and parallelism is achieved
MaterializedResult result = computeActual(
"MERGE INTO test_table_merge_target a " +
"USING test_table_merge_source b " +
"ON a.column1 = b.column1 " +
"WHEN MATCHED THEN UPDATE SET column2 = b.column2 " +
"WHEN NOT MATCHED THEN INSERT (column1, column2) VALUES (b.column1, b.column2)"
);
// Check output row count
assertEquals(result.getRowCount(), expectedRowCount());
// Check parallelism by verifying multiple tasks were used (mocked example)
int taskCount = getTaskCountForLastQuery();
assertTrue(taskCount > 1, "Expected parallel execution with multiple tasks");
}
// Helper methods for parallelism and expected row count
private int expectedRowCount()
{
// Replace with logic to compute expected row count based on test data
return 4;
}
private int getTaskCountForLastQuery()
{
// Replace with logic to fetch task count from query statistics
// For demonstration, return a mocked value
return 3;
}
```
- You may need to adjust the helper methods (`expectedRowCount`, `getTaskCountForLastQuery`) to match your test infrastructure and actual data.
- Ensure that `computeActual` and query statistics APIs are available in your test base class.
- If your test framework provides more direct ways to check parallelism (e.g., via query info or session properties), use those instead of the mocked example.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
@kasiafi PTAL when you have time, thanks! |
Description
Currently,
uniqueIdSymbol
come from target table.InINSERT
scenarios,unique_id
is always null, this causes the entire dataset to be processed by a single task, severely limiting parallelism and performance.Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(X) Release notes are required, with the following suggested text:
Summary by Sourcery
Assign unique IDs to both source and target rows during merge planning, coalesce them into a unified unique_id for mark distinct operations, update join and projection logic to guarantee non-null unique_id, and add tests to validate the updated merge plan.
Enhancements:
Tests: