Skip to content

Conversation

hqbhoho
Copy link
Contributor

@hqbhoho hqbhoho commented Sep 30, 2025

Description

Currently, uniqueIdSymbol come from target table.In INSERT scenarios, unique_id is always null, this causes the entire dataset to be processed by a single task, severely limiting parallelism and performance.

// Assign a unique id to every target table row
Symbol uniqueIdSymbol = symbolAllocator.newSymbol("unique_id", BIGINT);
RelationPlan planWithUniqueId = new RelationPlan(
       new AssignUniqueId(idAllocator.getNextId(), targetTablePlan.getRoot(), uniqueIdSymbol),
       mergeAnalysis.getTargetTableScope(),
       targetTablePlan.getFieldMappings(),
       outerContext);
// Mark distinct combinations of the unique_id value and the case_number
Symbol isDistinctSymbol = symbolAllocator.newSymbol("is_distinct", BOOLEAN);
MarkDistinctNode markDistinctNode = new MarkDistinctNode(idAllocator.getNextId(), project, isDistinctSymbol, ImmutableList.of(uniqueIdSymbol, caseNumberSymbol));

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(X) Release notes are required, with the following suggested text:

## General
* Improve performance of Merge for MarkDistinct in the NOT MATCHED case. ({issue}`26759 `)

Summary by Sourcery

Assign unique IDs to both source and target rows during merge planning, coalesce them into a unified unique_id for mark distinct operations, update join and projection logic to guarantee non-null unique_id, and add tests to validate the updated merge plan.

Enhancements:

  • Assign unique IDs to both target and source rows during merge planning to improve parallelism for NOT MATCHED cases
  • Coalesce target and source unique ID symbols into a unified unique_id symbol for mark distinct operations
  • Update join and projection logic to propagate sourceUniqueId and targetUniqueId symbols and ensure unique_id is always non-null
  • Simplify the mark distinct filter by removing explicit null checks on the unified unique_id

Tests:

  • Add TestMerge to validate AssignUniqueId usage and the coalesced unique_id in merge plans

@cla-bot cla-bot bot added the cla-signed label Sep 30, 2025
@hqbhoho hqbhoho marked this pull request as draft September 30, 2025 08:21
@hqbhoho hqbhoho force-pushed the feat/merge_plan branch 2 times, most recently from a881834 to 2f5848e Compare September 30, 2025 10:17
Copy link
Member

@kasiafi kasiafi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure we have tests for all the involved paths:

  • all source rows unmatched
  • a mix of matched and unmatched rows
    Same cases with failure from duplicate match

@hqbhoho
Copy link
Contributor Author

hqbhoho commented Oct 9, 2025

@kasiafi Thanks for your feedback.
In the following example:

trino> select * from iceberg.tmp.test_merge_01;
 id | name
----+------
(0 rows)

trino> select * from iceberg.tmp.test_merge_02;
 id | name
----+------
  1 |    1
  2 |    2
(2 rows)

trino> EXPLAIN MERGE INTO iceberg.tmp.test_merge_01 a
    -> using iceberg.tmp.test_merge_02 b
    -> on a.id = b.id
    -> WHEN MATCHED
    ->         THEN UPDATE SET name = b.name
    ->     WHEN NOT MATCHED
    ->         THEN INSERT (id ,name)
    ->               VALUES(b.id, b.name);

The logical plan (Trino version 443) is:

Fragment 0 [COORDINATOR_ONLY]                                                                                                                                                                                     >
     Output layout: [rows]                                                                                                                                                                                         >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     Output[columnNames = [rows]]                                                                                                                                                                                  >
     │   Layout: [rows:bigint]                                                                                                                                                                                     >
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}                                                                                                                                                 >
     └─ TableCommit[target = iceberg:tmp.test_merge_01$data@1242211021876735512]                                                                                                                                   >
        │   Layout: [rows:bigint]                                                                                                                                                                                  >
        └─ LocalExchange[partitioning = SINGLE]                                                                                                                                                                    >
           │   Layout: [partialrows:bigint, fragment:varbinary]                                                                                                                                                    >
           │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}                                                                                                                                           >
           └─ RemoteSource[sourceFragmentIds = [1]]                                                                                                                                                                >
                  Layout: [partialrows:bigint, fragment:varbinary]                                                                                                                                                 >
                                                                                                                                                                                                                   >
 Fragment 1 [MERGE [update = iceberg:INSTANCE]]                                                                                                                                                                    >
     Output layout: [partialrows, fragment]                                                                                                                                                                        >
     Output partitioning: SINGLE []                                                                                                                                                                                >
     MergeWriter[table = iceberg:tmp.test_merge_01$data@1242211021876735512]                                                                                                                                       >
     │   Layout: [partialrows:bigint, fragment:varbinary]                                                                                                                                                          >
     └─ LocalExchange[partitioning = MERGE [update = iceberg:INSTANCE], arguments = [SymbolReference[type=tinyint, name=operation], SymbolReference[type=row(_file varchar, _pos bigint, partition_spec_id integer,>
        │   Layout: [id:integer, name:integer, operation:tinyint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), insert_from_update:tinyint]                            >
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                              >
        └─ RemoteSource[sourceFragmentIds = [2]]                                                                                                                                                                   >
               Layout: [id:integer, name:integer, operation:tinyint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), insert_from_update:tinyint]                         >
                                                                                                                                                                                                                   >
 Fragment 2 [HASH]                                                                                                                                                                                                 >
     Output layout: [id, name, operation, field, insert_from_update]                                                                                                                                               >
     Output partitioning: MERGE [update = iceberg:INSTANCE] [operation, field]                                                                                                                                     >
     MergeProcessor[]                                                                                                                                                                                              >
     │   Layout: [id:integer, name:integer, operation:tinyint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), insert_from_update:tinyint]                               >
     │   target: iceberg:tmp.test_merge_01$data@1242211021876735512                                                                                                                                                >
     │   merge row column: merge_row                                                                                                                                                                               >
     │   row id column: field                                                                                                                                                                                      >
     │   redistribution columns: []                                                                                                                                                                                >
     │   data columns: [id, name]                                                                                                                                                                                  >
     └─ Filter[filterPredicate = (CASE WHEN ((NOT is_distinct) AND (NOT (unique_id IS NULL))) THEN CAST(system.builtin.fail(integer '119', varchar 'One MERGE target table row matched more than one source row') A>
        │   Layout: [unique_id:bigint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), merge_row:row(id integer, name integer, boolean, tinyint, integer), case_number:in>
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                              >
        └─ MarkDistinct[distinct = [unique_id:bigint, case_number:integer], marker = is_distinct]                                                                                                                  >
           │   Layout: [unique_id:bigint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), merge_row:row(id integer, name integer, boolean, tinyint, integer), case_number>
           └─ LocalExchange[partitioning = HASH, arguments = [SymbolReference[type=bigint, name=unique_id], SymbolReference[type=integer, name=case_number]]]                                                      >
              │   Layout: [unique_id:bigint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), merge_row:row(id integer, name integer, boolean, tinyint, integer), case_num>
              │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                        >
              └─ RemoteSource[sourceFragmentIds = [3]]                                                                                                                                                             >
                     Layout: [unique_id:bigint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), merge_row:row(id integer, name integer, boolean, tinyint, integer), case_>
                                                                                                                                                                                                                   >
 Fragment 3 [HASH]                                                                                                                                                                                                 >
     Output layout: [unique_id, field, merge_row, case_number]                                                                                                                                                     >
     Output partitioning: HASH [unique_id, case_number]                                                                                                                                                            >
     Project[]                                                                                                                                                                                                     >
     │   Layout: [unique_id:bigint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), merge_row:row(id integer, name integer, boolean, tinyint, integer), case_number:integ>
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                                 >
     │   case_number := merge_row[integer '5']                                                                                                                                                                     >
     └─ Project[]                                                                                                                                                                                                  >
        │   Layout: [unique_id:bigint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), merge_row:row(id integer, name integer, boolean, tinyint, integer)]               >
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                              >
        │   merge_row := (CASE WHEN present THEN ROW (id, name_1, (NOT (present IS NULL)), tinyint '3', integer '0') WHEN (present IS NULL) THEN ROW (id_0, name_1, (NOT (present IS NULL)), tinyint '1', integer '>
        └─ RightJoin[criteria = (id = id_0), distribution = PARTITIONED]                                                                                                                                           >
           │   Layout: [unique_id:bigint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), id:integer, present:boolean, id_0:integer, name_1:integer]                     >
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}                                                                                                                                            >
           │   Distribution: PARTITIONED                                                                                                                                                                           >
           │   dynamicFilterAssignments = {id_0 -> #df_727}                                                                                                                                                        >
           ├─ RemoteSource[sourceFragmentIds = [4]]                                                                                                                                                                >
           │      Layout: [unique_id:bigint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), id:integer, present:boolean]                                                >
           └─ LocalExchange[partitioning = HASH, arguments = [SymbolReference[type=integer, name=id_0]]]                                                                                                           >
              │   Layout: [id_0:integer, name_1:integer]                                                                                                                                                           >
              │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                        >
              └─ RemoteSource[sourceFragmentIds = [5]]                                                                                                                                                             >
                     Layout: [id_0:integer, name_1:integer]                                                                                                                                                        >
                                                                                                                                                                                                                   >
 Fragment 4 [SOURCE]                                                                                                                                                                                               >
     Output layout: [unique_id, field, id, present]                                                                                                                                                                >
     Output partitioning: HASH [id]                                                                                                                                                                                >
     Project[]                                                                                                                                                                                                     >
     │   Layout: [unique_id:bigint, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), id:integer, present:boolean]                                                         >
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                                 >
     │   present := boolean 'true'                                                                                                                                                                                 >
     └─ AssignUniqueId[]                                                                                                                                                                                           >
        │   Layout: [id:integer, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar), unique_id:bigint]                                                                       >
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                              >
        └─ ScanFilter[table = iceberg:tmp.test_merge_01$data@1242211021876735512, dynamicFilters = {id = #df_727}]                                                                                                 >
               Layout: [id:integer, field:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar)]                                                                                      >
               Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                            >
               field := -2147483647:$row_id:row(_file varchar, _pos bigint, partition_spec_id integer, partition_data varchar)                                                                                     >
               id := 1:id:integer                                                                                                                                                                                  >
                                                                                                                                                                                                                   >
 Fragment 5 [SOURCE]                                                                                                                                                                                               >
     Output layout: [id_0, name_1]                                                                                                                                                                                 >
     Output partitioning: HASH [id_0]                                                                                                                                                                              >
     TableScan[table = iceberg:tmp.test_merge_02$data@3640267961122064680]                                                                                                                                         >
         Layout: [id_0:integer, name_1:integer]                                                                                                                                                                    >
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                                 >
         name_1 := 2:name:integer                                                                                                                                                                                  >
         id_0 := 1:id:integer

In Fragment 2, MarkDistinct will base onunique_id and case_number, when inserting new rows into the target table, a Right Join will not match any rows, so the unique_id of the newly joined data should all be null.This would cause all newly inserted data to be processed within a single task during MarkDistinct.

 MarkDistinct[distinct = [unique_id:bigint, case_number:integer], marker = is_distinct] 

My current implementation is such that if a Right Join does not match any rows, it uses the unique_id generated by the original table instead of null and newly inserted data to be processed within multiple tasks during MarkDistinct. Therefore, this approach would not break the existing semantics.

I'm not sure if this implementation approach is feasible, or whether there might be a better way to implement it.If this solution is feasible, I will proceed with the work.

@kasiafi
Copy link
Member

kasiafi commented Oct 9, 2025

Hi @hqbhoho
in the example you mention, it's indeed correct. In case when there are no matches (which happens when the target table is empty), the coalesce will return the unique id from source for each row.

The problematic case is when unique ids from target get mixed with unique ids from source, and they accidentally clash -- so we have a false duplicate. Unique ids from target nad source get mixed when there is partial match: some source rows match to target rows, and some are unmatched.

To disambiguate the target and source unique ids, we propose a change to AssignUniqueId operator. We will allow to use additional mask to make sure that different AssignUniqueId operations produce non-overlapping results. @losipiuk is working on it.

@hqbhoho
Copy link
Contributor Author

hqbhoho commented Oct 9, 2025

@kasiafi
In current implementation, A MarkDistinct node adds an is_distinct column which is true if no other row has the same unique id and WHEN clause number, and false otherwise.

So It will check rows which is matched, in other words, the same match condition cannot be used for an UPDATE operation more than twice.
But rows which is not matched will not check because of unique_id is aways null, so we can insert two same rows if the row not in target table, Is this correct?

I think after right join, the unique_id should not null whether matched or not. Currently, the unique_id come from the target table,so after right join it could be null when not matched.

Only NOT MATCHED will use source table unique_id, so there will be no problem because additional case_nunber field will ensure uniqueness.

@kasiafi
Copy link
Member

kasiafi commented Oct 9, 2025

Only NOT MATCHED will use source table unique_id, so there will be no problem because additional case_nunber field will ensure uniqueness.

@hqbhoho thanks for the explanation. you're right. The case number is also considered, and MERGE cases cannot overlap between the matched and unmatched rows. Your solution is thus correct.
We can drop @losipiuk change for now, and revisit it when there's need to disambiguate between different AssignUniqueId operations.

Feel free to proceed with your work.

@hqbhoho hqbhoho marked this pull request as ready for review October 11, 2025 11:17
Copy link

sourcery-ai bot commented Oct 11, 2025

Reviewer's Guide

Enhance Merge planner to assign and coalesce unique IDs for both target and source rows, improving parallelism in mark distinct operations and updating projection, join, and filter logic accordingly, with new test coverage to validate the plan.

Sequence diagram for unique ID assignment and coalescing in Merge planning

sequenceDiagram
    participant Planner
    participant TargetTable
    participant SourceTable
    participant Join
    participant Project
    participant MarkDistinct
    Planner->>TargetTable: Assign unique_id (targetUniqueIdSymbol)
    Planner->>SourceTable: Assign unique_id (sourceUniqueIdSymbol)
    Planner->>Join: Join TargetTable and SourceTable
    Join->>Project: Project with Coalesce(targetUniqueIdSymbol, sourceUniqueIdSymbol) as uniqueIdSymbol
    Project->>MarkDistinct: Mark distinct on [uniqueIdSymbol, caseNumberSymbol]
Loading

Class diagram for updated unique ID assignment in Merge planning

classDiagram
    class QueryPlanner {
        +plan(Merge merge)
    }
    class AssignUniqueId {
        +AssignUniqueId(id, root, symbol)
    }
    class RelationPlan {
        +RelationPlan(root, scope, fieldMappings, outerContext)
    }
    class ProjectNode {
        +ProjectNode(id, root, assignments)
    }
    class MarkDistinctNode {
        +MarkDistinctNode(id, source, isDistinctSymbol, distinctSymbols)
    }
    QueryPlanner --> AssignUniqueId : uses
    QueryPlanner --> RelationPlan : creates
    QueryPlanner --> ProjectNode : creates
    QueryPlanner --> MarkDistinctNode : creates
    AssignUniqueId <|-- RelationPlan : composition
    ProjectNode <|-- RelationPlan : composition
    MarkDistinctNode <|-- ProjectNode : composition
    QueryPlanner : -targetUniqueIdSymbol
    QueryPlanner : -sourceUniqueIdSymbol
    QueryPlanner : -uniqueIdSymbol
    QueryPlanner : -Coalesce(targetUniqueIdSymbol, sourceUniqueIdSymbol)
Loading

File-Level Changes

Change Details Files
Assign distinct unique IDs to target and source rows to enable parallel mark distinct operations
  • Renamed uniqueIdSymbol to targetUniqueIdSymbol for target rows
  • Introduced AssignUniqueId on source root with sourceUniqueIdSymbol
  • Updated planJoin to use sourcePlanWithUniqueId
core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java
Coalesce target and source unique ID symbols into a single unique_id for marking distinct combinations
  • Created a new uniqueIdSymbol via Coalesce(target_unique_id, source_unique_id) expression
  • Updated projectionAssignmentsBuilder to project the coalesced unique_id
  • Adjusted MarkDistinctNode to use the coalesced unique_id
core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java
Refactor identity assignments and filter logic around unique_id and constraint checks
  • Added identity assignments for both targetUniqueIdSymbol and sourceUniqueIdSymbol
  • Switched to sourcePlanWithUniqueId.getFieldMappings() for identity assignments
  • Simplified filter expression to only check isDistinct (null check no longer needed)
core/trino-main/src/main/java/io/trino/sql/planner/QueryPlanner.java
Add comprehensive plan test for Merge with markDistinct using unique IDs
  • Introduced TestMerge.java with testMergeWithSimpleSelect
  • Asserted assignUniqueId usage for both target and source
  • Validated coalesced projection, join structure, and markDistinct/filter behavior
core/trino-main/src/test/java/io/trino/sql/planner/TestMerge.java

Possibly linked issues

  • #N/A: The PR ensures unique IDs for MarkDistinct in MERGE, preventing duplicates in distributed WHEN MATCHED operations.

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `core/trino-main/src/test/java/io/trino/sql/planner/TestMerge.java:102-103` </location>
<code_context>
+        return planTester;
+    }
+
+    @Test
+    public void testMergeWithSimpleSelect()
+    {
+        // one join
</code_context>

<issue_to_address>
**suggestion (testing):** Missing tests for edge cases such as duplicate source rows, null values, and empty tables.

Please add tests for duplicate source rows, nulls in join columns, empty tables, and non-matching source rows to ensure the unique_id logic and MarkDistinct behavior are robust.
</issue_to_address>

### Comment 2
<location> `core/trino-main/src/test/java/io/trino/sql/planner/TestMerge.java:103-106` </location>
<code_context>
+    }
+
+    @Test
+    public void testMergeWithSimpleSelect()
+    {
+        // one join
+        assertPlan(
+                "MERGE INTO test_table_merge_target a " +
+                        "USING test_table_merge_source b " +
</code_context>

<issue_to_address>
**suggestion (testing):** Test assertions focus on plan structure but do not verify runtime behavior or error conditions.

Please add tests that confirm expected errors are raised and that parallelism is achieved, such as by verifying task distribution or output row counts during execution.

Suggested implementation:

```java
    @Test
    public void testMergeWithSimpleSelect()
    {
        // one join
        assertPlan(
                "MERGE INTO test_table_merge_target a " +
                        "USING test_table_merge_source b " +
                        "ON a.column1 = b.column1 " +
                        "WHEN MATCHED " +
                        "THEN UPDATE SET column2 = b.column2 " +
                        "WHEN NOT MATCHED " +
                        "THEN INSERT (column1 ,column2) VALUES (b.column1, b.column2)",
                anyTree(
                        filter(
                                new Case(
                                        ImmutableList.of(new WhenClause(new Call(NOT, ImmutableList.of(new Reference(BOOLEAN, "is_distinct"))), new Cast(new Call(FAIL, ImmutableList.of(new Constant(INTEGER, (long) MERGE_TARGET_ROW_MULTIPLE_MATCHES.toErrorCode().getCode()), new Constant(VARCHAR, utf8Slice("One MERGE target table row matched more than one source row")))), BOOLEAN))),
                                        TRUE),
        );
    }

    @Test
    public void testMergeRaisesErrorOnMultipleMatches()
    {
        // Simulate a MERGE that should fail due to multiple matches
        try {
            computeActual(
                    "MERGE INTO test_table_merge_target a " +
                            "USING (SELECT column1, column2 FROM test_table_merge_source UNION ALL SELECT column1, column2 FROM test_table_merge_source) b " +
                            "ON a.column1 = b.column1 " +
                            "WHEN MATCHED THEN UPDATE SET column2 = b.column2"
            );
            fail("Expected exception due to multiple matches");
        }
        catch (RuntimeException e) {
            assertTrue(e.getMessage().contains("One MERGE target table row matched more than one source row"));
        }
    }

    @Test
    public void testMergeParallelismAndOutputRowCount()
    {
        // Simulate a MERGE and check that output row count matches expectation and parallelism is achieved
        MaterializedResult result = computeActual(
                "MERGE INTO test_table_merge_target a " +
                        "USING test_table_merge_source b " +
                        "ON a.column1 = b.column1 " +
                        "WHEN MATCHED THEN UPDATE SET column2 = b.column2 " +
                        "WHEN NOT MATCHED THEN INSERT (column1, column2) VALUES (b.column1, b.column2)"
        );
        // Check output row count
        assertEquals(result.getRowCount(), expectedRowCount());

        // Check parallelism by verifying multiple tasks were used (mocked example)
        int taskCount = getTaskCountForLastQuery();
        assertTrue(taskCount > 1, "Expected parallel execution with multiple tasks");
    }

    // Helper methods for parallelism and expected row count
    private int expectedRowCount()
    {
        // Replace with logic to compute expected row count based on test data
        return 4;
    }

    private int getTaskCountForLastQuery()
    {
        // Replace with logic to fetch task count from query statistics
        // For demonstration, return a mocked value
        return 3;
    }

```

- You may need to adjust the helper methods (`expectedRowCount`, `getTaskCountForLastQuery`) to match your test infrastructure and actual data.
- Ensure that `computeActual` and query statistics APIs are available in your test base class.
- If your test framework provides more direct ways to check parallelism (e.g., via query info or session properties), use those instead of the mocked example.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@hqbhoho
Copy link
Contributor Author

hqbhoho commented Oct 13, 2025

@kasiafi PTAL when you have time, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

3 participants