Skip to content

Conversation

codluca
Copy link
Member

@codluca codluca commented Oct 10, 2025

Add procedures support for Lakehouse

Fixes #26753

Description

LakehouseModule module declares the procedures from Hive, DeltaLake and Iceberg.
If the same procedure appears in two modules, a wrapper procedure is declared in the subpackage 'procedures'.
The new procedure will have the extra first parameter named 'tableType" (possible values 'HIVE', 'DELTA" and 'ICEBERG').
Based on the 'tableType', the wrapper procedure delegates to the actual procedure from the concrete module.

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
(X) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`26753`)

Summary by Sourcery

Add unified procedures support to the Lakehouse connector by wiring existing Hive, Delta Lake, and Iceberg procedures, introducing wrapper procedures that dispatch on table type, and validating them with new smoke tests.

New Features:

  • Expose a unified procedures API in the Lakehouse connector that delegates calls to the appropriate Hive, Delta Lake, or Iceberg implementation
  • Introduce wrapper procedures (drop_stats, flush_metadata_cache, register_table, unregister_table) that accept a tableType parameter and route to the correct backend

Enhancements:

  • Register Hive, Delta Lake, Iceberg, and mixed procedures in LakehouseModule and expose them via getProcedures in LakehouseConnector
  • Add optional bindings for GlueCache and DirectoryLister in the Hive module
  • Enable procedure-related session properties in BaseLakehouseConnectorSmokeTest

Tests:

  • Add smoke tests for lakehouse.system.* procedures in Hive, Delta Lake, and Iceberg connectors

LakehouseModule module declares the procedures from Hive, DeltaLake and Iceberg.
If the same procedure appears in two modules, a wrapper procedure is declared in the subpackage 'procedures'.
The new procedure will have the extra first parameter named 'tableType" (possible values 'HIVE', 'DELTA" and 'ICEBERG').
Based on the 'tableType', the wrapper procedure delegates to the actual procedure from the concrete module.
@cla-bot cla-bot bot added the cla-signed label Oct 10, 2025
Copy link

sourcery-ai bot commented Oct 10, 2025

Reviewer's Guide

Introduce full support for procedures in the Lakehouse connector by binding and exposing a unified procedure set, implementing provider-based wrapper procedures for Hive, Delta Lake, and Iceberg, integrating them into the connector API, and adding smoke tests to validate their behavior.

Sequence diagram for Lakehouse procedure delegation based on table type

sequenceDiagram
    participant Client
    participant LakehouseProcedure
    participant HiveProcedure
    participant DeltaProcedure
    participant IcebergProcedure
    Client->>LakehouseProcedure: Call procedure with tableType
    alt tableType == HIVE
        LakehouseProcedure->>HiveProcedure: Delegate procedure
    else tableType == DELTA
        LakehouseProcedure->>DeltaProcedure: Delegate procedure
    else tableType == ICEBERG
        LakehouseProcedure->>IcebergProcedure: Delegate procedure
    else unsupported
        LakehouseProcedure-->>Client: Error: Unsupported table type
    end
Loading

Class diagram for new Lakehouse procedure wrappers

classDiagram
    class LakehouseFlushMetadataCacheProcedure {
        +FlushMetadataCacheProcedure deltaLakeFlushMetadataCacheProcedure
        +FlushMetadataCacheProcedure hiveFlushMetadataCacheProcedure
        +Procedure get()
        +void flushMetadataCache(ConnectorSession, String, String, String, List<String>, List<String>)
    }
    class LakehouseDropStatsProcedure {
        +DropExtendedStatsProcedure deltaLakeDropStatsProcedure
        +DropStatsProcedure hiveDropStatsProcedure
        +Procedure get()
        +void dropStats(ConnectorSession, ConnectorAccessControl, String, String, String, List<?>)
    }
    class LakehouseRegisterTableProcedure {
        +RegisterTableProcedure deltaLakeRegisterTableProcedure
        +RegisterTableProcedure icebergRegisterTableProcedure
        +Procedure get()
        +void registerTable(ConnectorAccessControl, ConnectorSession, String, String, String, String, String)
    }
    class LakehouseUnregisterTableProcedure {
        +UnregisterTableProcedure deltaLakeUnregisterTableProcedure
        +UnregisterTableProcedure icebergUnregisterTableProcedure
        +Procedure get()
        +void unregisterTable(ConnectorAccessControl, ConnectorSession, String, String, String)
    }
    LakehouseFlushMetadataCacheProcedure --> FlushMetadataCacheProcedure
    LakehouseFlushMetadataCacheProcedure --> "io.trino.plugin.hive.procedure.FlushMetadataCacheProcedure"
    LakehouseDropStatsProcedure --> DropExtendedStatsProcedure
    LakehouseDropStatsProcedure --> DropStatsProcedure
    LakehouseRegisterTableProcedure --> RegisterTableProcedure
    LakehouseRegisterTableProcedure --> "io.trino.plugin.iceberg.procedure.RegisterTableProcedure"
    LakehouseUnregisterTableProcedure --> UnregisterTableProcedure
    LakehouseUnregisterTableProcedure --> "io.trino.plugin.iceberg.procedure.UnregisterTableProcedure"
Loading

File-Level Changes

Change Details Files
Bind and expose Lakehouse procedures in the connector
  • Register individual procedures using a Multibinder in LakehouseModule
  • Add optional Hive GlueCache and DirectoryLister binders
  • Inject Set into LakehouseConnector and implement getProcedures()
  • Enable procedure-related properties in BaseLakehouseConnectorSmokeTest configuration
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseHiveModule.java
plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/BaseLakehouseConnectorSmokeTest.java
Implement wrapper Provider-based procedures for mixed table types
  • Create LakehouseDropStatsProcedure to delegate drop_stats calls
  • Create LakehouseFlushMetadataCacheProcedure to delegate flush_metadata_cache calls
  • Create LakehouseRegisterTableProcedure to delegate register_table calls
  • Create LakehouseUnregisterTableProcedure to delegate unregister_table calls
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseDropStatsProcedure.java
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseFlushMetadataCacheProcedure.java
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseRegisterTableProcedure.java
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseUnregisterTableProcedure.java
Add smoke tests for new Lakehouse procedures
  • Add testProcedures methods in Hive, Delta, and Iceberg smoke test classes
  • Import randomNameSuffix and format utilities in test classes
  • Verify success and failure scenarios for each procedure call
plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java
plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java
plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseDropStatsProcedure.java:89` </location>
<code_context>
+                DROP_STATS.bindTo(this));
+    }
+
+    public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
+    {
+        if (TableType.DELTA.name().equals(tableType)) {
</code_context>

<issue_to_address>
**suggestion:** Consider using a more specific type for partitionValues.

List<?> reduces type safety. If partitionValues should be List<List<String>>, use that type to prevent runtime errors and improve code clarity.

Suggested implementation:

```java
    public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<List<String>> partitionValues)

```

```java
        else if (TableType.HIVE.name().equals(tableType)) {
            hiveDropStatsProcedure.dropStats(session, accessControl, schema, table, partitionValues);
        }

```
</issue_to_address>

### Comment 2
<location> `plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseDropStatsProcedure.java:91-95` </location>
<code_context>
+
+    public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
+    {
+        if (TableType.DELTA.name().equals(tableType)) {
+            if (partitionValues != null) {
+                throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
+            }
+            deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table);
+        }
+        else if (TableType.HIVE.name().equals(tableType)) {
</code_context>

<issue_to_address>
**suggestion:** Partition values check for Delta Lake could be stricter.

Also check that partitionValues is not empty to prevent unintended acceptance of empty lists.

```suggestion
        if (TableType.DELTA.name().equals(tableType)) {
            if (partitionValues != null && !partitionValues.isEmpty()) {
                throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
            }
            deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table);
```
</issue_to_address>

### Comment 3
<location> `plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseRegisterTableProcedure.java:90-94` </location>
<code_context>
+
+    public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
+    {
+        if (TableType.DELTA.name().equals(tableType)) {
+            if (partitionValues != null) {
+                throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
</code_context>

<issue_to_address>
**suggestion:** Metadata file name validation for Delta Lake could be stricter.

Also validate that metadataFileName is not an empty string to prevent invalid input.
</issue_to_address>

### Comment 4
<location> `plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseFlushMetadataCacheProcedure.java:91-98` </location>
<code_context>
+
+    public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
+    {
+        if (TableType.DELTA.name().equals(tableType)) {
+            if (partitionValues != null) {
+                throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
</code_context>

<issue_to_address>
**suggestion:** Partition columns and values checks for Delta Lake could be stricter.

Currently, empty lists for partitionColumns and partitionValues are allowed, which may lead to unintended behavior. Enforcing that these parameters must be null or omitted would make the checks more robust.
</issue_to_address>

### Comment 5
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java:97-98` </location>
<code_context>
+        assertThat(query(format("CALL lakehouse.system.sync_partition_metadata(CURRENT_SCHEMA, '%s', 'FULL')", tableName)))
+                .succeeds().returnsEmptyResult();
+
+        assertThat(query(format("CALL lakehouse.system.drop_stats('HIVE', CURRENT_SCHEMA, '%s', ARRAY[ARRAY['p1']])", tableName)))
+                .succeeds().returnsEmptyResult();
+
+        assertThat(query(format("CALL lakehouse.system.flush_metadata_cache('HIVE', CURRENT_SCHEMA, '%s', ARRAY['part'])", tableName)))
</code_context>

<issue_to_address>
**suggestion (testing):** Missing negative test for drop_stats with unsupported tableType.

Add a test that invokes drop_stats with an invalid tableType (such as 'FOO') and verifies that the correct error is raised.

```suggestion
        assertThat(query(format("CALL lakehouse.system.drop_stats('HIVE', CURRENT_SCHEMA, '%s', ARRAY[ARRAY['p1']])", tableName)))
                .succeeds().returnsEmptyResult();

        // Negative test: unsupported tableType
        assertThat(query(format("CALL lakehouse.system.drop_stats('FOO', CURRENT_SCHEMA, '%s', ARRAY[ARRAY['p1']])", tableName)))
                .failure().hasMessageContaining("Unsupported tableType: FOO");
```
</issue_to_address>

### Comment 6
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java:75-76` </location>
<code_context>
+        assertThat(query(format("CALL lakehouse.system.vacuum(CURRENT_SCHEMA, '%s', '8.00d')", tableName)))
+                .succeeds().returnsEmptyResult();
+
+        assertThat(query(format("CALL lakehouse.system.drop_stats('DELTA', CURRENT_SCHEMA, '%s')", tableName)))
+                .succeeds().returnsEmptyResult();
+
+        assertThat(query(format("CALL lakehouse.system.register_table('DELTA', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName)))
</code_context>

<issue_to_address>
**suggestion (testing):** Missing test for drop_stats with partition values for DELTA tableType.

Add a test for drop_stats with DELTA tableType and non-null partition values, asserting it fails with the correct error message to confirm validation logic.

```suggestion
        assertThat(query(format("CALL lakehouse.system.drop_stats('DELTA', CURRENT_SCHEMA, '%s')", tableName)))
                .succeeds().returnsEmptyResult();

        // Test drop_stats with DELTA tableType and non-null partition values, should fail with validation error
        assertThat(query(format("CALL lakehouse.system.drop_stats('DELTA', CURRENT_SCHEMA, '%s', ARRAY['partition_value'])", tableName)))
                .failure().hasMessage("Partition values are not supported for DELTA tableType in drop_stats procedure");
```
</issue_to_address>

### Comment 7
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java:78-79` </location>
<code_context>
+        assertThat(query(format("CALL lakehouse.system.drop_stats('DELTA', CURRENT_SCHEMA, '%s')", tableName)))
+                .succeeds().returnsEmptyResult();
+
+        assertThat(query(format("CALL lakehouse.system.register_table('DELTA', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName)))
+                .failure().hasMessage("Failed checking table location s3://bucket/table");
+
+        assertThat(query(format("CALL lakehouse.system.unregister_table('DELTA', CURRENT_SCHEMA, '%s')", tableName)))
</code_context>

<issue_to_address>
**suggestion (testing):** Missing test for register_table with metadataFileName for DELTA tableType.

Add a test for register_table with DELTA tableType and a non-null metadataFileName, verifying it fails with the expected error to cover argument validation.

```suggestion
        assertThat(query(format("CALL lakehouse.system.register_table('DELTA', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName)))
                .failure().hasMessage("Failed checking table location s3://bucket/table");

        // Test register_table with DELTA tableType and non-null metadataFileName, should fail with argument validation error
        String metadataFileName = "metadata.json";
        assertThat(query(format("CALL lakehouse.system.register_table('DELTA', CURRENT_SCHEMA, '%s', 's3://bucket/table', '%s')", tableName, metadataFileName)))
                .failure().hasMessageContaining("metadataFileName argument is not supported for DELTA tableType");
```
</issue_to_address>

### Comment 8
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java:63-64` </location>
<code_context>
+        assertThat(query(format("CALL lakehouse.system.register_table('ICEBERG', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName)))
+                .failure().hasMessage("Failed checking table location: s3://bucket/table");
+
+        assertThat(query(format("CALL lakehouse.system.unregister_table('ICEBERG', CURRENT_SCHEMA, '%s')", tableName)))
+                .succeeds().returnsEmptyResult();
+    }
 }
</code_context>

<issue_to_address>
**suggestion (testing):** Missing negative test for unregister_table with unsupported tableType.

Add a test for unregister_table using an unsupported tableType (e.g., 'FOO') and verify it fails with the correct error message.

```suggestion
        assertThat(query(format("CALL lakehouse.system.unregister_table('ICEBERG', CURRENT_SCHEMA, '%s')", tableName)))
                .succeeds().returnsEmptyResult();

        // Negative test: unsupported tableType
        assertThat(query(format("CALL lakehouse.system.unregister_table('FOO', CURRENT_SCHEMA, '%s')", tableName)))
                .failure().hasMessage("Unsupported table type: FOO");
```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

DROP_STATS.bindTo(this));
}

public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Consider using a more specific type for partitionValues.

List<?> reduces type safety. If partitionValues should be List<List>, use that type to prevent runtime errors and improve code clarity.

Suggested implementation:

    public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<List<String>> partitionValues)
        else if (TableType.HIVE.name().equals(tableType)) {
            hiveDropStatsProcedure.dropStats(session, accessControl, schema, table, partitionValues);
        }

Comment on lines +91 to +95
if (TableType.DELTA.name().equals(tableType)) {
if (partitionValues != null) {
throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
}
deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Partition values check for Delta Lake could be stricter.

Also check that partitionValues is not empty to prevent unintended acceptance of empty lists.

Suggested change
if (TableType.DELTA.name().equals(tableType)) {
if (partitionValues != null) {
throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
}
deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table);
if (TableType.DELTA.name().equals(tableType)) {
if (partitionValues != null && !partitionValues.isEmpty()) {
throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
}
deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table);

Comment on lines +90 to +94
if (TableType.DELTA.name().equals(tableType)) {
if (metadataFileName != null) {
throw new IllegalArgumentException("Metadata file name value is not supported for Delta Lake procedure");
}
deltaLakeRegisterTableProcedure.registerTable(accessControl, session, schema, table, tableLocation);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Metadata file name validation for Delta Lake could be stricter.

Also validate that metadataFileName is not an empty string to prevent invalid input.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Lakehouse connector doesn't support procedures

1 participant