-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Add procedures support for Lakehouse #26917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
LakehouseModule module declares the procedures from Hive, DeltaLake and Iceberg. If the same procedure appears in two modules, a wrapper procedure is declared in the subpackage 'procedures'. The new procedure will have the extra first parameter named 'tableType" (possible values 'HIVE', 'DELTA" and 'ICEBERG'). Based on the 'tableType', the wrapper procedure delegates to the actual procedure from the concrete module.
Reviewer's GuideIntroduce full support for procedures in the Lakehouse connector by binding and exposing a unified procedure set, implementing provider-based wrapper procedures for Hive, Delta Lake, and Iceberg, integrating them into the connector API, and adding smoke tests to validate their behavior. Sequence diagram for Lakehouse procedure delegation based on table typesequenceDiagram
participant Client
participant LakehouseProcedure
participant HiveProcedure
participant DeltaProcedure
participant IcebergProcedure
Client->>LakehouseProcedure: Call procedure with tableType
alt tableType == HIVE
LakehouseProcedure->>HiveProcedure: Delegate procedure
else tableType == DELTA
LakehouseProcedure->>DeltaProcedure: Delegate procedure
else tableType == ICEBERG
LakehouseProcedure->>IcebergProcedure: Delegate procedure
else unsupported
LakehouseProcedure-->>Client: Error: Unsupported table type
end
Class diagram for new Lakehouse procedure wrappersclassDiagram
class LakehouseFlushMetadataCacheProcedure {
+FlushMetadataCacheProcedure deltaLakeFlushMetadataCacheProcedure
+FlushMetadataCacheProcedure hiveFlushMetadataCacheProcedure
+Procedure get()
+void flushMetadataCache(ConnectorSession, String, String, String, List<String>, List<String>)
}
class LakehouseDropStatsProcedure {
+DropExtendedStatsProcedure deltaLakeDropStatsProcedure
+DropStatsProcedure hiveDropStatsProcedure
+Procedure get()
+void dropStats(ConnectorSession, ConnectorAccessControl, String, String, String, List<?>)
}
class LakehouseRegisterTableProcedure {
+RegisterTableProcedure deltaLakeRegisterTableProcedure
+RegisterTableProcedure icebergRegisterTableProcedure
+Procedure get()
+void registerTable(ConnectorAccessControl, ConnectorSession, String, String, String, String, String)
}
class LakehouseUnregisterTableProcedure {
+UnregisterTableProcedure deltaLakeUnregisterTableProcedure
+UnregisterTableProcedure icebergUnregisterTableProcedure
+Procedure get()
+void unregisterTable(ConnectorAccessControl, ConnectorSession, String, String, String)
}
LakehouseFlushMetadataCacheProcedure --> FlushMetadataCacheProcedure
LakehouseFlushMetadataCacheProcedure --> "io.trino.plugin.hive.procedure.FlushMetadataCacheProcedure"
LakehouseDropStatsProcedure --> DropExtendedStatsProcedure
LakehouseDropStatsProcedure --> DropStatsProcedure
LakehouseRegisterTableProcedure --> RegisterTableProcedure
LakehouseRegisterTableProcedure --> "io.trino.plugin.iceberg.procedure.RegisterTableProcedure"
LakehouseUnregisterTableProcedure --> UnregisterTableProcedure
LakehouseUnregisterTableProcedure --> "io.trino.plugin.iceberg.procedure.UnregisterTableProcedure"
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseDropStatsProcedure.java:89` </location>
<code_context>
+ DROP_STATS.bindTo(this));
+ }
+
+ public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
+ {
+ if (TableType.DELTA.name().equals(tableType)) {
</code_context>
<issue_to_address>
**suggestion:** Consider using a more specific type for partitionValues.
List<?> reduces type safety. If partitionValues should be List<List<String>>, use that type to prevent runtime errors and improve code clarity.
Suggested implementation:
```java
public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<List<String>> partitionValues)
```
```java
else if (TableType.HIVE.name().equals(tableType)) {
hiveDropStatsProcedure.dropStats(session, accessControl, schema, table, partitionValues);
}
```
</issue_to_address>
### Comment 2
<location> `plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseDropStatsProcedure.java:91-95` </location>
<code_context>
+
+ public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
+ {
+ if (TableType.DELTA.name().equals(tableType)) {
+ if (partitionValues != null) {
+ throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
+ }
+ deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table);
+ }
+ else if (TableType.HIVE.name().equals(tableType)) {
</code_context>
<issue_to_address>
**suggestion:** Partition values check for Delta Lake could be stricter.
Also check that partitionValues is not empty to prevent unintended acceptance of empty lists.
```suggestion
if (TableType.DELTA.name().equals(tableType)) {
if (partitionValues != null && !partitionValues.isEmpty()) {
throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
}
deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table);
```
</issue_to_address>
### Comment 3
<location> `plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseRegisterTableProcedure.java:90-94` </location>
<code_context>
+
+ public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
+ {
+ if (TableType.DELTA.name().equals(tableType)) {
+ if (partitionValues != null) {
+ throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
</code_context>
<issue_to_address>
**suggestion:** Metadata file name validation for Delta Lake could be stricter.
Also validate that metadataFileName is not an empty string to prevent invalid input.
</issue_to_address>
### Comment 4
<location> `plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/procedures/LakehouseFlushMetadataCacheProcedure.java:91-98` </location>
<code_context>
+
+ public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues)
+ {
+ if (TableType.DELTA.name().equals(tableType)) {
+ if (partitionValues != null) {
+ throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure");
</code_context>
<issue_to_address>
**suggestion:** Partition columns and values checks for Delta Lake could be stricter.
Currently, empty lists for partitionColumns and partitionValues are allowed, which may lead to unintended behavior. Enforcing that these parameters must be null or omitted would make the checks more robust.
</issue_to_address>
### Comment 5
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseHiveConnectorSmokeTest.java:97-98` </location>
<code_context>
+ assertThat(query(format("CALL lakehouse.system.sync_partition_metadata(CURRENT_SCHEMA, '%s', 'FULL')", tableName)))
+ .succeeds().returnsEmptyResult();
+
+ assertThat(query(format("CALL lakehouse.system.drop_stats('HIVE', CURRENT_SCHEMA, '%s', ARRAY[ARRAY['p1']])", tableName)))
+ .succeeds().returnsEmptyResult();
+
+ assertThat(query(format("CALL lakehouse.system.flush_metadata_cache('HIVE', CURRENT_SCHEMA, '%s', ARRAY['part'])", tableName)))
</code_context>
<issue_to_address>
**suggestion (testing):** Missing negative test for drop_stats with unsupported tableType.
Add a test that invokes drop_stats with an invalid tableType (such as 'FOO') and verifies that the correct error is raised.
```suggestion
assertThat(query(format("CALL lakehouse.system.drop_stats('HIVE', CURRENT_SCHEMA, '%s', ARRAY[ARRAY['p1']])", tableName)))
.succeeds().returnsEmptyResult();
// Negative test: unsupported tableType
assertThat(query(format("CALL lakehouse.system.drop_stats('FOO', CURRENT_SCHEMA, '%s', ARRAY[ARRAY['p1']])", tableName)))
.failure().hasMessageContaining("Unsupported tableType: FOO");
```
</issue_to_address>
### Comment 6
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java:75-76` </location>
<code_context>
+ assertThat(query(format("CALL lakehouse.system.vacuum(CURRENT_SCHEMA, '%s', '8.00d')", tableName)))
+ .succeeds().returnsEmptyResult();
+
+ assertThat(query(format("CALL lakehouse.system.drop_stats('DELTA', CURRENT_SCHEMA, '%s')", tableName)))
+ .succeeds().returnsEmptyResult();
+
+ assertThat(query(format("CALL lakehouse.system.register_table('DELTA', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName)))
</code_context>
<issue_to_address>
**suggestion (testing):** Missing test for drop_stats with partition values for DELTA tableType.
Add a test for drop_stats with DELTA tableType and non-null partition values, asserting it fails with the correct error message to confirm validation logic.
```suggestion
assertThat(query(format("CALL lakehouse.system.drop_stats('DELTA', CURRENT_SCHEMA, '%s')", tableName)))
.succeeds().returnsEmptyResult();
// Test drop_stats with DELTA tableType and non-null partition values, should fail with validation error
assertThat(query(format("CALL lakehouse.system.drop_stats('DELTA', CURRENT_SCHEMA, '%s', ARRAY['partition_value'])", tableName)))
.failure().hasMessage("Partition values are not supported for DELTA tableType in drop_stats procedure");
```
</issue_to_address>
### Comment 7
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java:78-79` </location>
<code_context>
+ assertThat(query(format("CALL lakehouse.system.drop_stats('DELTA', CURRENT_SCHEMA, '%s')", tableName)))
+ .succeeds().returnsEmptyResult();
+
+ assertThat(query(format("CALL lakehouse.system.register_table('DELTA', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName)))
+ .failure().hasMessage("Failed checking table location s3://bucket/table");
+
+ assertThat(query(format("CALL lakehouse.system.unregister_table('DELTA', CURRENT_SCHEMA, '%s')", tableName)))
</code_context>
<issue_to_address>
**suggestion (testing):** Missing test for register_table with metadataFileName for DELTA tableType.
Add a test for register_table with DELTA tableType and a non-null metadataFileName, verifying it fails with the expected error to cover argument validation.
```suggestion
assertThat(query(format("CALL lakehouse.system.register_table('DELTA', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName)))
.failure().hasMessage("Failed checking table location s3://bucket/table");
// Test register_table with DELTA tableType and non-null metadataFileName, should fail with argument validation error
String metadataFileName = "metadata.json";
assertThat(query(format("CALL lakehouse.system.register_table('DELTA', CURRENT_SCHEMA, '%s', 's3://bucket/table', '%s')", tableName, metadataFileName)))
.failure().hasMessageContaining("metadataFileName argument is not supported for DELTA tableType");
```
</issue_to_address>
### Comment 8
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java:63-64` </location>
<code_context>
+ assertThat(query(format("CALL lakehouse.system.register_table('ICEBERG', CURRENT_SCHEMA, '%s', 's3://bucket/table')", tableName)))
+ .failure().hasMessage("Failed checking table location: s3://bucket/table");
+
+ assertThat(query(format("CALL lakehouse.system.unregister_table('ICEBERG', CURRENT_SCHEMA, '%s')", tableName)))
+ .succeeds().returnsEmptyResult();
+ }
}
</code_context>
<issue_to_address>
**suggestion (testing):** Missing negative test for unregister_table with unsupported tableType.
Add a test for unregister_table using an unsupported tableType (e.g., 'FOO') and verify it fails with the correct error message.
```suggestion
assertThat(query(format("CALL lakehouse.system.unregister_table('ICEBERG', CURRENT_SCHEMA, '%s')", tableName)))
.succeeds().returnsEmptyResult();
// Negative test: unsupported tableType
assertThat(query(format("CALL lakehouse.system.unregister_table('FOO', CURRENT_SCHEMA, '%s')", tableName)))
.failure().hasMessage("Unsupported table type: FOO");
```
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
DROP_STATS.bindTo(this)); | ||
} | ||
|
||
public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<?> partitionValues) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Consider using a more specific type for partitionValues.
List<?> reduces type safety. If partitionValues should be List<List>, use that type to prevent runtime errors and improve code clarity.
Suggested implementation:
public void dropStats(ConnectorSession session, ConnectorAccessControl accessControl, String tableType, String schema, String table, List<List<String>> partitionValues)
else if (TableType.HIVE.name().equals(tableType)) {
hiveDropStatsProcedure.dropStats(session, accessControl, schema, table, partitionValues);
}
if (TableType.DELTA.name().equals(tableType)) { | ||
if (partitionValues != null) { | ||
throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure"); | ||
} | ||
deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Partition values check for Delta Lake could be stricter.
Also check that partitionValues is not empty to prevent unintended acceptance of empty lists.
if (TableType.DELTA.name().equals(tableType)) { | |
if (partitionValues != null) { | |
throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure"); | |
} | |
deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table); | |
if (TableType.DELTA.name().equals(tableType)) { | |
if (partitionValues != null && !partitionValues.isEmpty()) { | |
throw new IllegalArgumentException("Partition values are not supported for Delta Lake procedure"); | |
} | |
deltaLakeDropStatsProcedure.dropStats(session, accessControl, schema, table); |
if (TableType.DELTA.name().equals(tableType)) { | ||
if (metadataFileName != null) { | ||
throw new IllegalArgumentException("Metadata file name value is not supported for Delta Lake procedure"); | ||
} | ||
deltaLakeRegisterTableProcedure.registerTable(accessControl, session, schema, table, tableLocation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Metadata file name validation for Delta Lake could be stricter.
Also validate that metadataFileName is not an empty string to prevent invalid input.
Add procedures support for Lakehouse
Fixes #26753
Description
LakehouseModule module declares the procedures from Hive, DeltaLake and Iceberg.
If the same procedure appears in two modules, a wrapper procedure is declared in the subpackage 'procedures'.
The new procedure will have the extra first parameter named 'tableType" (possible values 'HIVE', 'DELTA" and 'ICEBERG').
Based on the 'tableType', the wrapper procedure delegates to the actual procedure from the concrete module.
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
(X) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:
Summary by Sourcery
Add unified procedures support to the Lakehouse connector by wiring existing Hive, Delta Lake, and Iceberg procedures, introducing wrapper procedures that dispatch on table type, and validating them with new smoke tests.
New Features:
Enhancements:
Tests: