Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions ci/scripts/e2e-sink-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev
mv target/debug/librisingwave_java_binding.so-"$profile" target/debug/librisingwave_java_binding.so

export RW_JAVA_BINDING_LIB_PATH=${PWD}/target/debug
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=stream_chunk
# TODO: Switch to stream_chunk encoding once it's completed, and then remove json encoding as well as this env var.
export RW_CONNECTOR_RPC_SINK_PAYLOAD_FORMAT=json
Comment on lines +34 to +35
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that here I switched back to JSON encoding because it looks not so easy to implement decoding for decimal & timestamp right now.


echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
Expand All @@ -55,7 +56,7 @@ mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE DATABASE IF NOT EXIST
# grant access to `test` for ci test user
mysql --host=mysql --port=3306 -u root -p123456 -e "GRANT ALL PRIVILEGES ON test.* TO 'mysqluser'@'%';"
# create a table named t_remote
mysql --host=mysql --port=3306 -u root -p123456 -e "CREATE TABLE IF NOT EXISTS test.t_remote (id INT, name VARCHAR(255), PRIMARY KEY (id));"
mysql --host=mysql --port=3306 -u root -p123456 test < ./e2e_test/sink/remote/mysql_create_table.sql

echo "--- preparing postgresql"

Expand All @@ -65,7 +66,7 @@ export PGPASSWORD=postgres
psql -h db -U postgres -c "CREATE ROLE test LOGIN SUPERUSER PASSWORD 'connector';"
createdb -h db -U postgres test
psql -h db -U postgres -d test -c "CREATE TABLE t4 (v1 int PRIMARY KEY, v2 int);"
psql -h db -U postgres -d test -c "CREATE TABLE t_remote (id serial PRIMARY KEY, name VARCHAR (50) NOT NULL);"
psql -h db -U postgres -d test < ./e2e_test/sink/remote/pg_create_table.sql

node_port=50051
node_timeout=10
Expand Down Expand Up @@ -106,13 +107,9 @@ sqllogictest -h db -p 5432 -d test './e2e_test/sink/remote/jdbc.check.pg.slt'
sleep 1

# check sink destination mysql using shell
if mysql --host=mysql --port=3306 -u root -p123456 -sN -e "SELECT * FROM test.t_remote ORDER BY id;" | awk '{
if ($1 == 1 && $2 == "Alex") c1++;
if ($1 == 3 && $2 == "Carl") c2++;
if ($1 == 4 && $2 == "Doris") c3++;
if ($1 == 5 && $2 == "Eve") c4++;
if ($1 == 6 && $2 == "Frank") c5++; }
END { exit !(c1 == 1 && c2 == 1 && c3 == 1 && c4 == 1 && c5 == 1); }'; then
diff -u ./e2e_test/sink/remote/mysql_expected_result.tsv \
<(mysql --host=mysql --port=3306 -u root -p123456 -s -N -r test -e "SELECT * FROM test.t_remote ORDER BY id")
if [ $? -eq 0 ]; then
echo "mysql sink check passed"
else
echo "The output is not as expected."
Expand Down
10 changes: 5 additions & 5 deletions e2e_test/sink/remote/jdbc.check.pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
query I
select * from t_remote order by id;
----
1 Alex
3 Carl
4 Doris
5 Eve
6 Frank
1 Alex 28208 281620391 4986480304337356800 28162.0391 2.03 28162.0391 2023-03-20 10:18:30
3 Carl 18300 1702307129 7878292368468104192 17023.07129 23.07 17023.07129 2023-03-20 10:18:32
4 Doris 17250 151951802 3946135584462581760 1519518.02 18.02 1519518.02 2023-03-21 10:18:30
5 Eve 9725 698160808 524334216698825600 69.8160808 69.81 69.8160808 2023-03-21 10:18:31
6 Frank 28131 1233587627 8492820454814063616 123358.7627 58.76 123358.7627 2023-03-21 10:18:32
24 changes: 20 additions & 4 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
statement ok
create table t_remote (id integer primary key, name varchar);
create table t_remote (
id integer primary key,
v_varchar varchar,
v_smallint smallint,
v_integer integer,
v_bigint bigint,
v_decimal decimal,
v_float float,
v_double double,
v_timestamp timestamp
);

statement ok
create materialized view mv_remote as select * from t_remote;
Expand All @@ -19,16 +29,22 @@ CREATE SINK s_mysql FROM mv_remote WITH (
);

statement ok
INSERT INTO t_remote VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Carl');
INSERT INTO t_remote VALUES
(1, 'Alice', 28208, 281620391, 4986480304337356659, 28162.0391, 2.03, 28162.0391, '2023-03-20 10:18:30'),
(2, 'Bob', 10580, 2131030003, 3074255027698877876, 21310.30003, 10.3, 21310.30003, '2023-03-20 10:18:31'),
(3, 'Carl', 18300, 1702307129, 7878292368468104216, 17023.07129, 23.07, 17023.07129, '2023-03-20 10:18:32');

statement ok
INSERT INTO t_remote VALUES (4, 'Doris'), (5, 'Eve'), (6, 'Frank');
INSERT INTO t_remote VALUES
(4, 'Doris', 17250, 151951802, 3946135584462581863, 1519518.02, 18.02, 1519518.02, '2023-03-21 10:18:30'),
(5, 'Eve', 9725, 698160808, 524334216698825611, 69.8160808, 69.81, 69.8160808, '2023-03-21 10:18:31'),
(6, 'Frank', 28131, 1233587627, 8492820454814063326, 123358.7627, 58.76, 123358.7627, '2023-03-21 10:18:32');

statement ok
FLUSH;

statement ok
UPDATE t_remote SET name = 'Alex' WHERE id = 1;
UPDATE t_remote SET v_varchar = 'Alex' WHERE id = 1;

statement ok
DELETE FROM t_remote WHERE id = 2;
Expand Down
11 changes: 11 additions & 0 deletions e2e_test/sink/remote/mysql_create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE t_remote (
id integer PRIMARY KEY,
v_varchar varchar(100),
v_smallint smallint,
v_integer integer,
v_bigint bigint,
v_decimal decimal,
v_float float,
v_double double,
v_timestamp timestamp
);
5 changes: 5 additions & 0 deletions e2e_test/sink/remote/mysql_expected_result.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1 Alex 28208 281620391 4986480304337356800 28162 2.03 28162.0391 2023-03-20 10:18:30
3 Carl 18300 1702307129 7878292368468104192 17023 23.07 17023.07129 2023-03-20 10:18:32
4 Doris 17250 151951802 3946135584462581760 1519518 18.02 1519518.02 2023-03-21 10:18:30
5 Eve 9725 698160808 524334216698825600 70 69.81 69.8160808 2023-03-21 10:18:31
6 Frank 28131 1233587627 8492820454814063616 123359 58.76 123358.7627 2023-03-21 10:18:32
11 changes: 11 additions & 0 deletions e2e_test/sink/remote/pg_create_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE t_remote (
id integer PRIMARY KEY,
v_varchar varchar(100),
v_smallint smallint,
v_integer integer,
v_bigint bigint,
v_decimal decimal,
v_float real,
v_double double precision,
v_timestamp timestamp
);
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload;
import com.risingwave.proto.Data;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Map;

public class JsonDeserializer implements Deserializer {
Expand All @@ -31,6 +33,8 @@ public JsonDeserializer(TableSchema tableSchema) {
this.tableSchema = tableSchema;
}

// Encoding here should be consistent with `datum_to_json_object()` in
// src/connector/src/sink/mod.rs
@Override
public CloseableIterator<SinkRow> deserialize(
ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch) {
Expand Down Expand Up @@ -113,6 +117,19 @@ private static Double castDouble(Object value) {
}
}

private static BigDecimal castDecimal(Object value) {
if (value instanceof String) {
// FIXME(eric): See `datum_to_json_object()` in src/connector/src/sink/mod.rs
return new BigDecimal((String) value);
} else if (value instanceof BigDecimal) {
return (BigDecimal) value;
} else {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unable to cast into double from " + value.getClass())
.asRuntimeException();
}
}

private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Object value) {
switch (typeName) {
case INT16:
Expand All @@ -132,13 +149,24 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj
return castDouble(value);
case FLOAT:
return castDouble(value).floatValue();
case DECIMAL:
return castDecimal(value);
case BOOLEAN:
if (!(value instanceof Boolean)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("Expected boolean, got " + value.getClass())
.asRuntimeException();
}
return value;
case TIMESTAMP:
case TIMESTAMPTZ:
if (!(value instanceof String)) {
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription(
"Expected timestamp in string, got " + value.getClass())
.asRuntimeException();
}
return Timestamp.valueOf((String) value);
default:
throw io.grpc.Status.INVALID_ARGUMENT
.withDescription("unsupported type " + typeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public void validate(
jdbcPk.add(pkResultSet.getString("COLUMN_NAME"));
}
} catch (SQLException e) {
throw Status.INTERNAL.withCause(e).asRuntimeException();
throw Status.INVALID_ARGUMENT
.withDescription("failed to connect to target database: " + e.getSQLState())
.asRuntimeException();
}

if (!jdbcTableNames.contains(tableName)) {
Expand Down
5 changes: 0 additions & 5 deletions java/tools/maven/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,6 @@ This file is based on the checkstyle file of Apache Beam.
<!-- Enforce Java-style array declarations -->
<module name="ArrayTypeStyle"/>

<module name="TodoComment">
<!-- Checks that disallowed strings are not used in comments. -->
<property name="format" value="(FIXME)|(XXX)|(@author)"/>
</module>

<!--

IMPORT CHECKS
Expand Down