Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ src/log/

log/

*.log

.risingwave/
.bin/

Expand Down
6 changes: 4 additions & 2 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ statement ok
CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
sink.mode='append-only',
location.type='minio',
warehouse.path='minio://hummockadmin:[email protected]:9301/iceberg',
warehouse.path = 's3://iceberg',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
);
Expand Down
6 changes: 4 additions & 2 deletions integration_tests/iceberg-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ FROM
bhv_mv WITH (
connector = 'iceberg',
sink.mode='upsert',
location.type='minio',
warehouse.path='minio://hummockadmin:hummockadmin@minio-0:9301/hummock001/iceberg-data',
warehouse.path = 's3://hummock001/iceberg-data',
s3.endpoint = 'http://minio-0:9301',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
database.name='demo_db',
table.name='demo_table'
);
12 changes: 8 additions & 4 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,21 @@ def test_print_sink(input_file):
def test_iceberg_sink(input_file):
test_sink("iceberg",
{"sink.mode":"append-only",
"location.type":"minio",
"warehouse.path":"minio://minioadmin:[email protected]:9000/bucket",
"warehouse.path":"s3a://bucket",
"s3.endpoint": "http://127.0.0.1:9000",
"s3.access.key": "minioadmin",
"s3.secret.key": "minioadmin",
"database.name":"demo_db",
"table.name":"demo_table"},
input_file)

def test_upsert_iceberg_sink(input_file):
test_upsert_sink("iceberg",
{"sink.mode":"upsert",
"location.type":"minio",
"warehouse.path":"minio://minioadmin:[email protected]:9000/bucket",
"warehouse.path":"s3a://bucket",
"s3.endpoint": "http://127.0.0.1:9000",
"s3.access.key": "minioadmin",
"s3.secret.key": "minioadmin",
"database.name":"demo_db",
"table.name":"demo_table"},
input_file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkBase;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.java.utils.MinioUrlParser;
import io.grpc.Status;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -38,11 +39,15 @@ public class IcebergSinkFactory implements SinkFactory {
private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkFactory.class);

public static final String SINK_MODE_PROP = "sink.mode";
public static final String LOCATION_TYPE_PROP = "location.type";
public static final String WAREHOUSE_PATH_PROP = "warehouse.path";
public static final String DATABASE_NAME_PROP = "database.name";
public static final String TABLE_NAME_PROP = "table.name";
public static final String S3_ACCESS_KEY_PROP = "s3.access.key";
public static final String S3_SECRET_KEY_PROP = "s3.secret.key";
public static final String S3_ENDPOINT_PROP = "s3.endpoint";
public static final FileFormat FILE_FORMAT = FileFormat.PARQUET;

// hadoop catalog config
private static final String confEndpoint = "fs.s3a.endpoint";
private static final String confKey = "fs.s3a.access.key";
private static final String confSecret = "fs.s3a.secret.key";
Expand All @@ -56,83 +61,88 @@ public SinkBase create(TableSchema tableSchema, Map<String, String> tablePropert
validate(tableSchema, tableProperties);

String mode = tableProperties.get(SINK_MODE_PROP);
String location = tableProperties.get(LOCATION_TYPE_PROP);
String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP);
String warehousePath = getWarehousePath(tableProperties);
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
String tableName = tableProperties.get(TABLE_NAME_PROP);

String scheme = parseWarehousePathScheme(warehousePath);

TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
HadoopCatalog hadoopCatalog = createHadoopCatalog(location, warehousePath);
Configuration hadoopConf = createHadoopConf(scheme, tableProperties);
HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath);
Table icebergTable;
try {
icebergTable = hadoopCatalog.loadTable(tableIdentifier);
} catch (Exception e) {
LOG.error("load table error: {}", e);
throw Status.FAILED_PRECONDITION
.withDescription("failed to load iceberg table")
.withDescription(
String.format("failed to load iceberg table: %s", e.getMessage()))
.withCause(e)
.asRuntimeException();
}

if (mode.equals("append-only")) {
return new IcebergSink(tableSchema, hadoopCatalog, icebergTable, FILE_FORMAT);
} else if (mode.equals("upsert")) {
return new UpsertIcebergSink(tableSchema, hadoopCatalog, icebergTable, FILE_FORMAT);
return new UpsertIcebergSink(
tableSchema, hadoopCatalog,
icebergTable, FILE_FORMAT);
}
throw UNIMPLEMENTED.withDescription("unsupported mode: " + mode).asRuntimeException();
}

@Override
public void validate(TableSchema tableSchema, Map<String, String> tableProperties) {
if (!tableProperties.containsKey(SINK_MODE_PROP) // only append-only, upsert
|| !tableProperties.containsKey(LOCATION_TYPE_PROP) // only local, s3, minio
|| !tableProperties.containsKey(WAREHOUSE_PATH_PROP)
|| !tableProperties.containsKey(DATABASE_NAME_PROP)
|| !tableProperties.containsKey(TABLE_NAME_PROP)) {
throw INVALID_ARGUMENT
.withDescription(
String.format(
"%s, %s, %s, %s or %s is not specified",
"%s, %s, %s or %s is not specified",
SINK_MODE_PROP,
LOCATION_TYPE_PROP,
WAREHOUSE_PATH_PROP,
DATABASE_NAME_PROP,
TABLE_NAME_PROP))
.asRuntimeException();
}

String mode = tableProperties.get(SINK_MODE_PROP);
String location = tableProperties.get(LOCATION_TYPE_PROP);
String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP);
String databaseName = tableProperties.get(DATABASE_NAME_PROP);
String tableName = tableProperties.get(TABLE_NAME_PROP);
String warehousePath = getWarehousePath(tableProperties);

String schema = parseWarehousePathScheme(warehousePath);

TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName);
HadoopCatalog hadoopCatalog = createHadoopCatalog(location, warehousePath);
Configuration hadoopConf = createHadoopConf(schema, tableProperties);
HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath);
Table icebergTable;
try {
icebergTable = hadoopCatalog.loadTable(tableIdentifier);
} catch (Exception e) {
LOG.error("load table error: {}", e);
throw Status.FAILED_PRECONDITION
.withDescription("failed to load iceberg table")
.withDescription(
String.format("failed to load iceberg table: %s", e.getMessage()))
.withCause(e)
.asRuntimeException();
}
// check that all columns in tableSchema exist in the iceberg table
for (String columnName : tableSchema.getColumnNames()) {
if (icebergTable.schema().findField(columnName) == null) {
LOG.error("column not found: {}", columnName);
throw Status.FAILED_PRECONDITION
.withDescription("table schema does not match")
.withDescription(
String.format(
"table schema does not match. Column %s not found in iceberg table",
columnName))
.asRuntimeException();
}
}
// check that all required columns in the iceberg table exist in tableSchema
Set<String> columnNames = Set.of(tableSchema.getColumnNames());
for (Types.NestedField column : icebergTable.schema().columns()) {
if (column.isRequired() && !columnNames.contains(column.name())) {
LOG.error("required column not found: {}", column.name());
throw Status.FAILED_PRECONDITION
.withDescription(
String.format("missing a required field %s", column.name()))
Expand All @@ -153,26 +163,62 @@ public void validate(TableSchema tableSchema, Map<String, String> tablePropertie
}
}

private HadoopCatalog createHadoopCatalog(String location, String warehousePath) {
Configuration hadoopConf = new Configuration();
switch (location) {
case "local":
return new HadoopCatalog(hadoopConf, warehousePath);
case "s3":
hadoopConf.set(confIoImpl, s3FileIOImpl);
String s3aPath = "s3a:" + warehousePath.substring(warehousePath.indexOf('/'));
return new HadoopCatalog(hadoopConf, s3aPath);
case "minio":
private static String getWarehousePath(Map<String, String> tableProperties) {
String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP);
// unify s3 and s3a
if (warehousePath.startsWith("s3://")) {
return warehousePath.replace("s3://", "s3a://");
}
return warehousePath;
}

private static String parseWarehousePathScheme(String warehousePath) {
try {
URI uri = new URI(warehousePath);
String scheme = uri.getScheme();
if (scheme == null) {
throw INVALID_ARGUMENT
.withDescription("warehouse path should set scheme (e.g. s3a://)")
.asRuntimeException();
}
return scheme;
} catch (URISyntaxException e) {
throw INVALID_ARGUMENT
.withDescription(
String.format("invalid warehouse path uri: %s", e.getMessage()))
.withCause(e)
.asRuntimeException();
}
}

private Configuration createHadoopConf(String scheme, Map<String, String> tableProperties) {
switch (scheme) {
case "file":
return new Configuration();
case "s3a":
Configuration hadoopConf = new Configuration();
hadoopConf.set(confIoImpl, s3FileIOImpl);
MinioUrlParser minioUrlParser = new MinioUrlParser(warehousePath);
hadoopConf.set(confEndpoint, minioUrlParser.getEndpoint());
hadoopConf.set(confKey, minioUrlParser.getKey());
hadoopConf.set(confSecret, minioUrlParser.getSecret());
hadoopConf.setBoolean(confPathStyleAccess, true);
return new HadoopCatalog(hadoopConf, "s3a://" + minioUrlParser.getBucket());
if (!tableProperties.containsKey(S3_ENDPOINT_PROP)) {
throw INVALID_ARGUMENT
.withDescription(
String.format(
"Should set %s for warehouse with scheme %s",
S3_ENDPOINT_PROP, scheme))
.asRuntimeException();
}
hadoopConf.set(confEndpoint, tableProperties.get(S3_ENDPOINT_PROP));
if (tableProperties.containsKey(S3_ACCESS_KEY_PROP)) {
hadoopConf.set(confKey, tableProperties.get(S3_ACCESS_KEY_PROP));
}
if (tableProperties.containsKey(S3_SECRET_KEY_PROP)) {
hadoopConf.set(confSecret, tableProperties.get(S3_SECRET_KEY_PROP));
}
return hadoopConf;
default:
throw UNIMPLEMENTED
.withDescription("unsupported iceberg sink type: " + location)
.withDescription(
String.format("scheme %s not supported for warehouse path", scheme))
.asRuntimeException();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@
import org.junit.Test;

public class IcebergSinkFactoryTest {
static String warehousePath = "/tmp/rw-sinknode/iceberg-sink/warehouse";
static String warehousePath = "file:///tmp/rw-sinknode/iceberg-sink/warehouse";
static String databaseName = "demo_db";
static String tableName = "demo_table";
static String locationType = "local";
static String sinkMode = "append-only";
static Schema icebergTableSchema =
new Schema(
Expand Down Expand Up @@ -67,8 +66,6 @@ public void testCreate() throws IOException {
Map.of(
IcebergSinkFactory.SINK_MODE_PROP,
sinkMode,
IcebergSinkFactory.LOCATION_TYPE_PROP,
locationType,
IcebergSinkFactory.WAREHOUSE_PATH_PROP,
warehousePath,
IcebergSinkFactory.DATABASE_NAME_PROP,
Expand Down