From 87d62b3ffa275c2a6597f43a308ca0265dd118a1 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Mon, 15 Sep 2025 18:01:31 +0800 Subject: [PATCH 01/17] improve: struct tasks support batch_size --- dt-common/src/config/extractor_config.rs | 3 + dt-common/src/config/task_config.rs | 3 + .../extractor/mysql/mysql_struct_extractor.rs | 14 +++- dt-task/src/extractor_util.rs | 4 +- dt-task/src/task_runner.rs | 57 +++++++++++-- .../struct/batch_test/dst_prepare.sql | 5 ++ .../struct/batch_test/expect_ddl_8.0.sql | 72 ++++++++++++++++ .../struct/batch_test/src_prepare.sql | 84 +++++++++++++++++++ .../struct/batch_test/task_config.ini | 23 +++++ dt-tests/tests/mysql_to_mysql/struct_tests.rs | 6 ++ 10 files changed, 259 insertions(+), 12 deletions(-) create mode 100644 dt-tests/tests/mysql_to_mysql/struct/batch_test/dst_prepare.sql create mode 100644 dt-tests/tests/mysql_to_mysql/struct/batch_test/expect_ddl_8.0.sql create mode 100644 dt-tests/tests/mysql_to_mysql/struct/batch_test/src_prepare.sql create mode 100644 dt-tests/tests/mysql_to_mysql/struct/batch_test/task_config.ini diff --git a/dt-common/src/config/extractor_config.rs b/dt-common/src/config/extractor_config.rs index d528f38e..23c450cf 100644 --- a/dt-common/src/config/extractor_config.rs +++ b/dt-common/src/config/extractor_config.rs @@ -8,12 +8,15 @@ pub enum ExtractorConfig { MysqlStruct { url: String, db: String, + dbs: Vec, + batch_size: usize, }, PgStruct { url: String, schema: String, do_global_structs: bool, + batch_size: usize, }, MysqlSnapshot { diff --git a/dt-common/src/config/task_config.rs b/dt-common/src/config/task_config.rs index 4036bd1f..17b4b8cb 100644 --- a/dt-common/src/config/task_config.rs +++ b/dt-common/src/config/task_config.rs @@ -176,6 +176,8 @@ impl TaskConfig { ExtractType::Struct => ExtractorConfig::MysqlStruct { url, db: String::new(), + dbs: Vec::new(), + batch_size: loader.get_with_default(EXTRACTOR, BATCH_SIZE, 1), }, ExtractType::FoxlakeS3 => { @@ -234,6 +236,7 @@ impl TaskConfig { url, schema: String::new(), do_global_structs: false, + batch_size: loader.get_with_default(EXTRACTOR, BATCH_SIZE, 1), }, _ => bail! { not_supported_err }, diff --git a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs index a55c1eca..be38d6e9 100644 --- a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs +++ b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs @@ -18,14 +18,20 @@ pub struct MysqlStructExtractor { pub base_extractor: BaseExtractor, pub conn_pool: Pool, pub db: String, + pub dbs: Vec, pub filter: RdbFilter, } #[async_trait] impl Extractor for MysqlStructExtractor { async fn extract(&mut self) -> anyhow::Result<()> { - log_info!("MysqlStructExtractor starts, schema: {}", self.db,); - self.extract_internal().await?; + log_info!( + "MysqlStructExtractor starts, schemas: {}", + self.dbs.join(",") + ); + for db in self.dbs.clone().iter() { + self.extract_internal(db).await?; + } self.base_extractor.wait_task_finish().await } @@ -35,11 +41,11 @@ impl Extractor for MysqlStructExtractor { } impl MysqlStructExtractor { - pub async fn extract_internal(&mut self) -> anyhow::Result<()> { + pub async fn extract_internal(&mut self, db: &String) -> anyhow::Result<()> { let meta_manager = MysqlMetaManager::new(self.conn_pool.clone()).await?; let mut fetcher = MysqlStructFetcher { conn_pool: self.conn_pool.to_owned(), - db: self.db.clone(), + db: db.clone(), filter: Some(self.filter.to_owned()), meta_manager, }; diff --git a/dt-task/src/extractor_util.rs b/dt-task/src/extractor_util.rs index bef7c83a..43546ba4 100644 --- a/dt-task/src/extractor_util.rs +++ b/dt-task/src/extractor_util.rs @@ -332,13 +332,14 @@ impl ExtractorUtil { Box::new(extractor) } - ExtractorConfig::MysqlStruct { url, db } => { + ExtractorConfig::MysqlStruct { url, db, dbs, .. } => { // TODO, pass max_connections as parameter let conn_pool = TaskUtil::create_mysql_conn_pool(&url, 2, enable_sqlx_log, false).await?; let extractor = MysqlStructExtractor { conn_pool, db, + dbs, filter, base_extractor, }; @@ -349,6 +350,7 @@ impl ExtractorUtil { url, schema, do_global_structs, + batch_size, } => { // TODO, pass max_connections as parameter let conn_pool = diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index a09c70db..1488273b 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -186,22 +186,65 @@ impl TaskRunner { self.task_monitor .add_no_window_metrics(TaskMetricsType::ExtractorPlanRecords, record_count); } - + let struct_batch_size = match &self.config.extractor { + ExtractorConfig::MysqlStruct { batch_size, .. } + | ExtractorConfig::PgStruct { batch_size, .. } => Some(*batch_size), + _ => None, + }; + if let Some(batch_size) = struct_batch_size { + if batch_size <= 0 { + bail!("batch_size must be greater than 0") + } + let schema_chunks: Vec> = schemas + .chunks(batch_size) + .map(|chunk| chunk.to_vec()) + .collect(); + let extractor_configs: Vec = match &self.config.extractor { + ExtractorConfig::MysqlStruct { + url, + db, + batch_size, + .. + } => { + let mut extractor_configs = Vec::new(); + for schema_chunk in schema_chunks.iter() { + let extractor_config = ExtractorConfig::MysqlStruct { + url: url.clone(), + db: db.clone(), + dbs: schema_chunk.clone(), + batch_size: *batch_size, + }; + extractor_configs.push(extractor_config); + } + extractor_configs + } + _ => vec![], + }; + for extractor_config in &extractor_configs { + self.clone() + .start_single_task( + extractor_config, + router, + snapshot_resumer, + cdc_resumer, + true, + ) + .await?; + } + } // TODO: Need to limit resources when starting tasks concurrently at schema level. // Currently connection count, rate limit, buffer size, etc. are controlled at single task level, // which in multi-task mode will amplify these resources by at least schema count times for (flag, schema) in schemas.iter().enumerate() { // start a task for each schema let schema_extractor_config = match &self.config.extractor { - ExtractorConfig::MysqlStruct { url, .. } => Some(ExtractorConfig::MysqlStruct { - url: url.clone(), - db: schema.clone(), - }), - - ExtractorConfig::PgStruct { url, .. } => Some(ExtractorConfig::PgStruct { + ExtractorConfig::PgStruct { + url, batch_size, .. + } => Some(ExtractorConfig::PgStruct { url: url.clone(), schema: schema.clone(), do_global_structs: flag == 0, + batch_size: *batch_size, }), _ => None, diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/dst_prepare.sql new file mode 100644 index 00000000..25168537 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/dst_prepare.sql @@ -0,0 +1,5 @@ +drop database if exists struct_it_mysql2mysql_1; +drop database if exists struct_it_mysql2mysql_2; +drop database if exists struct_it_mysql2mysql_3; +drop database if exists struct_it_mysql2mysql_4; +drop database if exists struct_it_mysql2mysql_5; \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/expect_ddl_8.0.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/expect_ddl_8.0.sql new file mode 100644 index 00000000..d70b0f19 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/expect_ddl_8.0.sql @@ -0,0 +1,72 @@ +struct_it_mysql2mysql_1 +CREATE DATABASE `struct_it_mysql2mysql_1` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci */ /*!80016 DEFAULT ENCRYPTION='N' */ + +struct_it_mysql2mysql_1.expression_defaults +CREATE TABLE `expression_defaults` ( + `i` int DEFAULT '0', + `c` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci DEFAULT '', + `f` float DEFAULT ((rand() * rand())), + `b` binary(16) DEFAULT (uuid_to_bin(uuid())), + `d` date DEFAULT ((curdate() + interval 1 year)), + `p` point DEFAULT (point(0,0)), + `j` json DEFAULT (json_array()) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_520_ci + +struct_it_mysql2mysql_2 +CREATE DATABASE `struct_it_mysql2mysql_2` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci */ /*!80016 DEFAULT ENCRYPTION='N' */ + +struct_it_mysql2mysql_2.expression_defaults +CREATE TABLE `expression_defaults` ( + `i` int DEFAULT '0', + `c` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci DEFAULT '', + `f` float DEFAULT ((rand() * rand())), + `b` binary(16) DEFAULT (uuid_to_bin(uuid())), + `d` date DEFAULT ((curdate() + interval 1 year)), + `p` point DEFAULT (point(0,0)), + `j` json DEFAULT (json_array()) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_520_ci + + +struct_it_mysql2mysql_3 +CREATE DATABASE `struct_it_mysql2mysql_3` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci */ /*!80016 DEFAULT ENCRYPTION='N' */ + +struct_it_mysql2mysql_3.expression_defaults +CREATE TABLE `expression_defaults` ( + `i` int DEFAULT '0', + `c` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci DEFAULT '', + `f` float DEFAULT ((rand() * rand())), + `b` binary(16) DEFAULT (uuid_to_bin(uuid())), + `d` date DEFAULT ((curdate() + interval 1 year)), + `p` point DEFAULT (point(0,0)), + `j` json DEFAULT (json_array()) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_520_ci + + +struct_it_mysql2mysql_4 +CREATE DATABASE `struct_it_mysql2mysql_4` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci */ /*!80016 DEFAULT ENCRYPTION='N' */ + +struct_it_mysql2mysql_4.expression_defaults +CREATE TABLE `expression_defaults` ( + `i` int DEFAULT '0', + `c` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci DEFAULT '', + `f` float DEFAULT ((rand() * rand())), + `b` binary(16) DEFAULT (uuid_to_bin(uuid())), + `d` date DEFAULT ((curdate() + interval 1 year)), + `p` point DEFAULT (point(0,0)), + `j` json DEFAULT (json_array()) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_520_ci + + +struct_it_mysql2mysql_5 +CREATE DATABASE `struct_it_mysql2mysql_5` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci */ /*!80016 DEFAULT ENCRYPTION='N' */ + +struct_it_mysql2mysql_5.expression_defaults +CREATE TABLE `expression_defaults` ( + `i` int DEFAULT '0', + `c` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci DEFAULT '', + `f` float DEFAULT ((rand() * rand())), + `b` binary(16) DEFAULT (uuid_to_bin(uuid())), + `d` date DEFAULT ((curdate() + interval 1 year)), + `p` point DEFAULT (point(0,0)), + `j` json DEFAULT (json_array()) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_520_ci \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/src_prepare.sql new file mode 100644 index 00000000..fb269305 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/src_prepare.sql @@ -0,0 +1,84 @@ +drop database if exists struct_it_mysql2mysql_1; +drop database if exists struct_it_mysql2mysql_2; +drop database if exists struct_it_mysql2mysql_3; +drop database if exists struct_it_mysql2mysql_4; +drop database if exists struct_it_mysql2mysql_5; + +create database if not exists struct_it_mysql2mysql_1 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; +create database if not exists struct_it_mysql2mysql_2 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; +create database if not exists struct_it_mysql2mysql_3 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; +create database if not exists struct_it_mysql2mysql_4 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; +create database if not exists struct_it_mysql2mysql_5 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; + +-- expression defaults are surpported in mysql 8.0 +-- https://dev.mysql.com/doc/refman/8.0/en/data-type-defaults.html +-- The BLOB, TEXT, GEOMETRY, and JSON data types cannot be assigned a default value. +``` +CREATE TABLE struct_it_mysql2mysql_1.expression_defaults ( + -- literal defaults + i INT DEFAULT 0, + c VARCHAR(10) DEFAULT '', + -- expression defaults + f FLOAT DEFAULT (RAND() * RAND()), + b BINARY(16) DEFAULT (UUID_TO_BIN(UUID())), + d DATE DEFAULT (CURRENT_DATE + INTERVAL 1 YEAR), + p POINT DEFAULT (Point(0,0)), + j JSON DEFAULT (JSON_ARRAY()) +); +``` + +``` +CREATE TABLE struct_it_mysql2mysql_2.expression_defaults ( + -- literal defaults + i INT DEFAULT 0, + c VARCHAR(10) DEFAULT '', + -- expression defaults + f FLOAT DEFAULT (RAND() * RAND()), + b BINARY(16) DEFAULT (UUID_TO_BIN(UUID())), + d DATE DEFAULT (CURRENT_DATE + INTERVAL 1 YEAR), + p POINT DEFAULT (Point(0,0)), + j JSON DEFAULT (JSON_ARRAY()) +); +``` + +``` +CREATE TABLE struct_it_mysql2mysql_3.expression_defaults ( + -- literal defaults + i INT DEFAULT 0, + c VARCHAR(10) DEFAULT '', + -- expression defaults + f FLOAT DEFAULT (RAND() * RAND()), + b BINARY(16) DEFAULT (UUID_TO_BIN(UUID())), + d DATE DEFAULT (CURRENT_DATE + INTERVAL 1 YEAR), + p POINT DEFAULT (Point(0,0)), + j JSON DEFAULT (JSON_ARRAY()) +); +``` + +``` +CREATE TABLE struct_it_mysql2mysql_4.expression_defaults ( + -- literal defaults + i INT DEFAULT 0, + c VARCHAR(10) DEFAULT '', + -- expression defaults + f FLOAT DEFAULT (RAND() * RAND()), + b BINARY(16) DEFAULT (UUID_TO_BIN(UUID())), + d DATE DEFAULT (CURRENT_DATE + INTERVAL 1 YEAR), + p POINT DEFAULT (Point(0,0)), + j JSON DEFAULT (JSON_ARRAY()) +); +``` + +``` +CREATE TABLE struct_it_mysql2mysql_5.expression_defaults ( + -- literal defaults + i INT DEFAULT 0, + c VARCHAR(10) DEFAULT '', + -- expression defaults + f FLOAT DEFAULT (RAND() * RAND()), + b BINARY(16) DEFAULT (UUID_TO_BIN(UUID())), + d DATE DEFAULT (CURRENT_DATE + INTERVAL 1 YEAR), + p POINT DEFAULT (Point(0,0)), + j JSON DEFAULT (JSON_ARRAY()) +); +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/task_config.ini b/dt-tests/tests/mysql_to_mysql/struct/batch_test/task_config.ini new file mode 100644 index 00000000..f29a983f --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/task_config.ini @@ -0,0 +1,23 @@ +[extractor] +extract_type=struct +db_type=mysql +url={mysql_extractor_url_8_0} +batch_size=2 + +[sinker] +sink_type=struct +db_type=mysql +batch_size=1 +url={mysql_sinker_url_8_0} +conflict_policy=interrupt + +[filter] +do_dbs=struct_it_mysql2mysql_* + +[parallelizer] +parallel_type=serial +parallel_size=1 + +[pipeline] +checkpoint_interval_secs=1 +buffer_size=100 \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct_tests.rs b/dt-tests/tests/mysql_to_mysql/struct_tests.rs index 96de4fbb..a1578bcf 100644 --- a/dt-tests/tests/mysql_to_mysql/struct_tests.rs +++ b/dt-tests/tests/mysql_to_mysql/struct_tests.rs @@ -48,4 +48,10 @@ mod test { async fn struct_8_0_basic_test() { TestBase::run_mysql_struct_test("mysql_to_mysql/struct/8_0_basic_test").await; } + + #[tokio::test] + #[serial] + async fn struct_batch_test() { + TestBase::run_mysql_struct_test("mysql_to_mysql/struct/batch_test").await; + } } From 996e590bbe6fd6b53f7eb7292636672436ee90f4 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Mon, 15 Sep 2025 19:32:30 +0800 Subject: [PATCH 02/17] pg struct --- dt-common/src/config/extractor_config.rs | 1 + dt-common/src/config/task_config.rs | 1 + .../src/extractor/pg/pg_struct_extractor.rs | 14 +++- dt-task/src/extractor_util.rs | 4 +- dt-task/src/task_runner.rs | 79 +++++++++---------- 5 files changed, 51 insertions(+), 48 deletions(-) diff --git a/dt-common/src/config/extractor_config.rs b/dt-common/src/config/extractor_config.rs index 23c450cf..80af8e28 100644 --- a/dt-common/src/config/extractor_config.rs +++ b/dt-common/src/config/extractor_config.rs @@ -15,6 +15,7 @@ pub enum ExtractorConfig { PgStruct { url: String, schema: String, + schemas: Vec, do_global_structs: bool, batch_size: usize, }, diff --git a/dt-common/src/config/task_config.rs b/dt-common/src/config/task_config.rs index 17b4b8cb..5a3ca12a 100644 --- a/dt-common/src/config/task_config.rs +++ b/dt-common/src/config/task_config.rs @@ -235,6 +235,7 @@ impl TaskConfig { ExtractType::Struct => ExtractorConfig::PgStruct { url, schema: String::new(), + schemas: Vec::new(), do_global_structs: false, batch_size: loader.get_with_default(EXTRACTOR, BATCH_SIZE, 1), }, diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index 4417b039..fdb8e345 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -17,6 +17,7 @@ pub struct PgStructExtractor { pub base_extractor: BaseExtractor, pub conn_pool: Pool, pub schema: String, + pub schemas: Vec, pub do_global_structs: bool, pub filter: RdbFilter, } @@ -24,8 +25,13 @@ pub struct PgStructExtractor { #[async_trait] impl Extractor for PgStructExtractor { async fn extract(&mut self) -> anyhow::Result<()> { - log_info!("PgStructExtractor starts, schema: {}", self.schema); - self.extract_internal().await?; + log_info!( + "PgStructExtractor starts, schemas: {}", + self.schemas.join(",") + ); + for schema in self.schemas.clone().iter() { + self.extract_internal(schema).await?; + } self.base_extractor.wait_task_finish().await } @@ -35,10 +41,10 @@ impl Extractor for PgStructExtractor { } impl PgStructExtractor { - pub async fn extract_internal(&mut self) -> anyhow::Result<()> { + pub async fn extract_internal(&mut self, schema: &String) -> anyhow::Result<()> { let mut pg_fetcher = PgStructFetcher { conn_pool: self.conn_pool.to_owned(), - schema: self.schema.clone(), + schema: schema.clone(), filter: Some(self.filter.to_owned()), }; diff --git a/dt-task/src/extractor_util.rs b/dt-task/src/extractor_util.rs index 43546ba4..a2aa199d 100644 --- a/dt-task/src/extractor_util.rs +++ b/dt-task/src/extractor_util.rs @@ -349,8 +349,9 @@ impl ExtractorUtil { ExtractorConfig::PgStruct { url, schema, + schemas, do_global_structs, - batch_size, + .. } => { // TODO, pass max_connections as parameter let conn_pool = @@ -358,6 +359,7 @@ impl ExtractorUtil { let extractor = PgStructExtractor { conn_pool, schema, + schemas, do_global_structs, filter, base_extractor, diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index 1488273b..be25a380 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -218,6 +218,25 @@ impl TaskRunner { } extractor_configs } + ExtractorConfig::PgStruct { + url, + schema, + batch_size, + .. + } => { + let mut extractor_configs = Vec::new(); + for (flag, schema_chunk) in schema_chunks.iter().enumerate() { + let extractor_config = ExtractorConfig::PgStruct { + url: url.clone(), + schema: schema.clone(), + schemas: schema_chunk.clone(), + do_global_structs: flag == 0, + batch_size: *batch_size, + }; + extractor_configs.push(extractor_config); + } + extractor_configs + } _ => vec![], }; for extractor_config in &extractor_configs { @@ -231,50 +250,24 @@ impl TaskRunner { ) .await?; } - } - // TODO: Need to limit resources when starting tasks concurrently at schema level. - // Currently connection count, rate limit, buffer size, etc. are controlled at single task level, - // which in multi-task mode will amplify these resources by at least schema count times - for (flag, schema) in schemas.iter().enumerate() { - // start a task for each schema - let schema_extractor_config = match &self.config.extractor { - ExtractorConfig::PgStruct { - url, batch_size, .. - } => Some(ExtractorConfig::PgStruct { - url: url.clone(), - schema: schema.clone(), - do_global_structs: flag == 0, - batch_size: *batch_size, - }), - - _ => None, - }; - - if let Some(extractor_config) = schema_extractor_config { - self.clone() - .start_single_task( - &extractor_config, - router, - snapshot_resumer, - cdc_resumer, - true, - ) - .await?; - continue; - } - - // find pending tables - let tbs = TaskUtil::list_tbs(url, schema, db_type).await?; - for tb in tbs.iter() { - if snapshot_resumer.check_finished(schema, tb) { - log_info!("schema: {}, tb: {}, already finished", schema, tb); - continue; - } - if filter.filter_event(schema, tb, &RowType::Insert) { - log_info!("schema: {}, tb: {}, insert events filtered", schema, tb); - continue; + } else { + // TODO: Need to limit resources when starting tasks concurrently at schema level. + // Currently connection count, rate limit, buffer size, etc. are controlled at single task level, + // which in multi-task mode will amplify these resources by at least schema count times + for schema in schemas.iter() { + // find pending tables + let tbs = TaskUtil::list_tbs(url, schema, db_type).await?; + for tb in tbs.iter() { + if snapshot_resumer.check_finished(schema, tb) { + log_info!("schema: {}, tb: {}, already finished", schema, tb); + continue; + } + if filter.filter_event(schema, tb, &RowType::Insert) { + log_info!("schema: {}, tb: {}, insert events filtered", schema, tb); + continue; + } + pending_tbs.push_back((schema.to_owned(), tb.to_owned())); } - pending_tbs.push_back((schema.to_owned(), tb.to_owned())); } } From 4ed9a134ed81a06c2c2021621d17443e69798176 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Mon, 15 Sep 2025 20:01:28 +0800 Subject: [PATCH 03/17] fix --- .../src/extractor/pg/pg_struct_extractor.rs | 8 +- dt-task/src/task_runner.rs | 6 +- .../pg_to_pg/struct/batch_test/dst_clean.sql | 1 + .../struct/batch_test/dst_prepare.sql | 3 + .../pg_to_pg/struct/batch_test/src_clean.sql | 1 + .../struct/batch_test/src_prepare.sql | 188 ++++++++++++++++++ .../struct/batch_test/task_config.ini | 39 ++++ dt-tests/tests/pg_to_pg/struct_tests.rs | 6 + 8 files changed, 245 insertions(+), 7 deletions(-) create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index fdb8e345..ee2e7fca 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -29,8 +29,8 @@ impl Extractor for PgStructExtractor { "PgStructExtractor starts, schemas: {}", self.schemas.join(",") ); - for schema in self.schemas.clone().iter() { - self.extract_internal(schema).await?; + for (flag, schema) in self.schemas.clone().iter().enumerate() { + self.extract_internal(schema, self.do_global_structs && flag == 0).await?; } self.base_extractor.wait_task_finish().await } @@ -41,7 +41,7 @@ impl Extractor for PgStructExtractor { } impl PgStructExtractor { - pub async fn extract_internal(&mut self, schema: &String) -> anyhow::Result<()> { + pub async fn extract_internal(&mut self, schema: &String, do_global_structs: bool) -> anyhow::Result<()> { let mut pg_fetcher = PgStructFetcher { conn_pool: self.conn_pool.to_owned(), schema: schema.clone(), @@ -59,7 +59,7 @@ impl PgStructExtractor { .await?; } - if self.do_global_structs && !self.filter.filter_structure(&StructureType::Rbac) { + if do_global_structs && !self.filter.filter_structure(&StructureType::Rbac) { // do rbac init let rbac_statements = pg_fetcher.get_create_rbac_statements().await?; for statement in rbac_statements { diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index be25a380..8219eda6 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -191,12 +191,12 @@ impl TaskRunner { | ExtractorConfig::PgStruct { batch_size, .. } => Some(*batch_size), _ => None, }; - if let Some(batch_size) = struct_batch_size { - if batch_size <= 0 { + if let Some(struct_batch_size) = struct_batch_size { + if struct_batch_size == 0 { bail!("batch_size must be greater than 0") } let schema_chunks: Vec> = schemas - .chunks(batch_size) + .chunks(struct_batch_size) .map(|chunk| chunk.to_vec()) .collect(); let extractor_configs: Vec = match &self.config.extractor { diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql new file mode 100644 index 00000000..e88cc948 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql @@ -0,0 +1 @@ +-- drop schema if exists struct_it_pg2pg_1 CASCADE; \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql new file mode 100644 index 00000000..386024c9 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql @@ -0,0 +1,3 @@ +drop schema if exists struct_it_pg2pg_1 CASCADE; + +-- create schema struct_it_pg2pg_1; diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql new file mode 100644 index 00000000..e88cc948 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql @@ -0,0 +1 @@ +-- drop schema if exists struct_it_pg2pg_1 CASCADE; \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql new file mode 100644 index 00000000..b70fa66b --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql @@ -0,0 +1,188 @@ +drop schema if exists struct_it_pg2pg_1 CASCADE; + +create schema struct_it_pg2pg_1; + +-- all basic column types: +``` +CREATE TABLE struct_it_pg2pg_1.full_column_type ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + char_col CHAR(10), + text_col TEXT, + boolean_col BOOLEAN, + smallint_col SMALLINT, + integer_col INTEGER, + bigint_col BIGINT, + decimal_col DECIMAL(10, 2), + numeric_col NUMERIC(10, 2), + real_col REAL, + double_precision_col DOUBLE PRECISION, + date_col DATE, + time_col TIME, + timestamp_col TIMESTAMP, + interval_col INTERVAL, + bytea_col BYTEA, + uuid_col UUID, + xml_col XML, + json_col JSON, + jsonb_col JSONB, + point_col POINT, + line_col LINE, + lseg_col LSEG, + box_col BOX, + path_col PATH, + polygon_col POLYGON, + circle_col CIRCLE +); +``` + +-- array column types: +-- CREATE TABLE struct_it_pg2pg_1.array_table (pk SERIAL, int_array INT[], bigint_array BIGINT[], text_array TEXT[], char_array CHAR(10)[], varchar_array VARCHAR(10)[], date_array DATE[], numeric_array NUMERIC(10, 2)[], varnumeric_array NUMERIC[3], citext_array CITEXT[], inet_array INET[], cidr_array CIDR[], macaddr_array MACADDR[], tsrange_array TSRANGE[], tstzrange_array TSTZRANGE[], daterange_array DATERANGE[], int4range_array INT4RANGE[],numerange_array NUMRANGE[], int8range_array INT8RANGE[], uuid_array UUID[], json_array json[], jsonb_array jsonb[], oid_array OID[], PRIMARY KEY(pk)); + +-- postgres 12, without: CITEXT[] +``` +CREATE TABLE struct_it_pg2pg_1.array_table ( + pk SERIAL, + int_array INT[], + bigint_array BIGINT[], + text_array TEXT[], + char_array CHAR(10) [], + varchar_array VARCHAR(10) [], + date_array DATE[], + numeric_array NUMERIC(10, 2) [], + varnumeric_array NUMERIC[3], + inet_array INET[], + cidr_array CIDR[], + macaddr_array MACADDR[], + tsrange_array TSRANGE[], + tstzrange_array TSTZRANGE[], + daterange_array DATERANGE[], + int4range_array INT4RANGE[], + numerange_array NUMRANGE[], + int8range_array INT8RANGE[], + uuid_array UUID[], + json_array json[], + jsonb_array jsonb[], + oid_array OID[], + PRIMARY KEY(pk) +); +``` + +-- all check types(without fk and exclude): +``` +CREATE TABLE struct_it_pg2pg_1.full_constraint_type ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + unique_col VARCHAR(255) UNIQUE, + not_null_col VARCHAR(255) NOT NULL, + check_col VARCHAR(255) CHECK (char_length(check_col) > 3) +); +``` + +-- all index types: +``` +CREATE TABLE struct_it_pg2pg_1.full_index_type ( + id SERIAL PRIMARY KEY, + unique_col VARCHAR(255) NOT NULL, + index_col VARCHAR(255), + fulltext_col TSVECTOR, + spatial_col POINT NOT NULL, + simple_index_col VARCHAR(255), + composite_index_col1 VARCHAR(255), + composite_index_col2 VARCHAR(255), + composite_index_col3 VARCHAR(255) +); +``` + +CREATE UNIQUE INDEX unique_index ON struct_it_pg2pg_1.full_index_type (unique_col); + +CREATE INDEX index_index ON struct_it_pg2pg_1.full_index_type (index_col); + +CREATE INDEX fulltext_index ON struct_it_pg2pg_1.full_index_type USING gin(fulltext_col); + +CREATE INDEX spatial_index ON struct_it_pg2pg_1.full_index_type USING gist(spatial_col); + +CREATE INDEX simple_index ON struct_it_pg2pg_1.full_index_type (simple_index_col); + +``` +CREATE INDEX composite_index ON struct_it_pg2pg_1.full_index_type ( + composite_index_col1, composite_index_col2, + composite_index_col3 +); +``` + +-- table comments: +COMMENT ON TABLE struct_it_pg2pg_1.full_column_type IS 'Comment on full_column_type.'; +COMMENT ON TABLE struct_it_pg2pg_1.full_index_type IS 'Comment on full_index_type.'; + +-- column comments: +COMMENT ON COLUMN struct_it_pg2pg_1.full_column_type.id IS 'Comment on full_column_type.id.'; +COMMENT ON COLUMN struct_it_pg2pg_1.full_index_type.id IS 'Comment on full_index_type.id.'; + +-- sequences + +-- case 1: sequeces created automatically when creating table +CREATE TABLE struct_it_pg2pg_1.sequence_test_1 (seq_1 SERIAL, seq_2 BIGSERIAL, seq_3 SMALLSERIAL); + +-- case 2: create independent sequences, then alter their owners +CREATE SEQUENCE struct_it_pg2pg_1.sequence_test_2_seq_1; +CREATE SEQUENCE struct_it_pg2pg_1.sequence_test_2_seq_2; +CREATE SEQUENCE struct_it_pg2pg_1.sequence_test_2_seq_3; + +CREATE TABLE struct_it_pg2pg_1.sequence_test_2 (seq_1 INTEGER, seq_2 BIGINT, seq_3 SMALLINT); + +-- in postgres, sequence must be in same schema as table it is linked to +-- actually, postgres allows mutiple sequences owned by the same table.column, here we just ignore +ALTER SEQUENCE struct_it_pg2pg_1.sequence_test_2_seq_1 OWNED BY struct_it_pg2pg_1.sequence_test_2.seq_1; +ALTER SEQUENCE struct_it_pg2pg_1.sequence_test_2_seq_2 OWNED BY struct_it_pg2pg_1.sequence_test_2.seq_2; +ALTER SEQUENCE struct_it_pg2pg_1.sequence_test_2_seq_3 OWNED BY struct_it_pg2pg_1.sequence_test_2.seq_3; + +-- case 3: create independent sequences, use them in column defaults without ownership +-- we should migrate these sequences +CREATE SEQUENCE struct_it_pg2pg_1.sequence_test_3_seq_2; +CREATE SEQUENCE struct_it_pg2pg_1."sequence_test_3_seq.\d@_3"; + +``` +CREATE TABLE struct_it_pg2pg_1.sequence_test_3 ( + seq_1 SERIAL, + seq_2 BIGINT DEFAULT nextval('struct_it_pg2pg_1.sequence_test_3_seq_2'), + seq_3 SMALLINT DEFAULT nextval('struct_it_pg2pg_1."sequence_test_3_seq.\d@_3"') +); +``` + +-- case 4: create independent sequences and never used by any tables +-- we should not migrate them +CREATE SEQUENCE struct_it_pg2pg_1.sequence_test_4_seq_1; + +-- for case 1 & 2, the sequence ownership can be got by below sql + +-- SELECT seq.relname, +-- tab.relname AS table_name, +-- attr.attname AS column_name, +-- ns.nspname +-- FROM pg_class AS seq +-- JOIN pg_namespace ns +-- ON (seq.relnamespace = ns.oid) +-- JOIN pg_depend AS dep +-- ON (seq.relfilenode = dep.objid) +-- JOIN pg_class AS tab +-- ON (dep.refobjid = tab.relfilenode) +-- JOIN pg_attribute AS attr +-- ON (attr.attnum = dep.refobjsubid AND attr.attrelid = dep.refobjid) +-- WHERE dep.deptype='a' +-- AND seq.relkind='S' +-- AND ns.nspname = 'struct_it_pg2pg_1'; + +-- test view filtered +CREATE VIEW struct_it_pg2pg_1.full_column_type_view AS SELECT * FROM struct_it_pg2pg_1.full_column_type; + +-- special character +``` +CREATE TABLE struct_it_pg2pg_1."special_character_$1#@*_table" ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + unique_col VARCHAR(255) UNIQUE, + not_null_col VARCHAR(255) NOT NULL, + check_col VARCHAR(255) CHECK (char_length(check_col) > 3) +); +``` \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini new file mode 100644 index 00000000..ca60b1b7 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini @@ -0,0 +1,39 @@ +[extractor] +extract_type=struct +db_type=pg +url={pg_extractor_url} +batch_size=2 + +[sinker] +sink_type=struct +db_type=pg +batch_size=1 +url={pg_sinker_url} +conflict_policy=interrupt +# conflict_policy=ignore + +[filter] +do_dbs=struct_it_pg2pg_1 +ignore_dbs= +do_tbs= +ignore_tbs= +do_events= +do_structures=database,table,constraint,sequence,comment,index + +[router] +db_map= +tb_map= +col_map= + +[runtime] +log_level=info +log4rs_file=./log4rs.yaml +log_dir=./logs + +[parallelizer] +parallel_type=serial +parallel_size=1 + +[pipeline] +checkpoint_interval_secs=1 +buffer_size=100 \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct_tests.rs b/dt-tests/tests/pg_to_pg/struct_tests.rs index 3569f214..560b005f 100644 --- a/dt-tests/tests/pg_to_pg/struct_tests.rs +++ b/dt-tests/tests/pg_to_pg/struct_tests.rs @@ -50,4 +50,10 @@ mod test { runner.run_struct_test_without_check().await.unwrap(); TestBase::run_dcl_check_test("pg_to_pg/struct/rbac_test").await; } + + #[tokio::test] + #[serial] + async fn struct_batch_test() { + TestBase::run_pg_struct_test("pg_to_pg/struct/batch_test").await; + } } From 7e4a74f8b46dcbfa0c0e65b08cdce988d90c5a24 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Tue, 16 Sep 2025 14:11:32 +0800 Subject: [PATCH 04/17] fix test --- .../src/extractor/pg/pg_struct_extractor.rs | 9 +- .../struct/batch_test/dst_prepare.sql | 4 + .../struct/batch_test/src_prepare.sql | 336 +++++++++++++++++- .../struct/batch_test/task_config.ini | 2 +- 4 files changed, 347 insertions(+), 4 deletions(-) diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index ee2e7fca..d300e581 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -30,7 +30,8 @@ impl Extractor for PgStructExtractor { self.schemas.join(",") ); for (flag, schema) in self.schemas.clone().iter().enumerate() { - self.extract_internal(schema, self.do_global_structs && flag == 0).await?; + self.extract_internal(schema, self.do_global_structs && flag == 0) + .await?; } self.base_extractor.wait_task_finish().await } @@ -41,7 +42,11 @@ impl Extractor for PgStructExtractor { } impl PgStructExtractor { - pub async fn extract_internal(&mut self, schema: &String, do_global_structs: bool) -> anyhow::Result<()> { + pub async fn extract_internal( + &mut self, + schema: &String, + do_global_structs: bool, + ) -> anyhow::Result<()> { let mut pg_fetcher = PgStructFetcher { conn_pool: self.conn_pool.to_owned(), schema: schema.clone(), diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql index 386024c9..341c62e3 100644 --- a/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql @@ -1,3 +1,7 @@ drop schema if exists struct_it_pg2pg_1 CASCADE; +drop schema if exists struct_it_pg2pg_2 CASCADE; + +drop schema if exists struct_it_pg2pg_3 CASCADE; + -- create schema struct_it_pg2pg_1; diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql index b70fa66b..bba3e515 100644 --- a/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql @@ -185,4 +185,338 @@ CREATE TABLE struct_it_pg2pg_1."special_character_$1#@*_table" ( not_null_col VARCHAR(255) NOT NULL, check_col VARCHAR(255) CHECK (char_length(check_col) > 3) ); -``` \ No newline at end of file +``` + +drop schema if exists struct_it_pg2pg_2 CASCADE; + +create schema struct_it_pg2pg_2; + +-- all basic column types: +``` +CREATE TABLE struct_it_pg2pg_2.full_column_type ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + char_col CHAR(10), + text_col TEXT, + boolean_col BOOLEAN, + smallint_col SMALLINT, + integer_col INTEGER, + bigint_col BIGINT, + decimal_col DECIMAL(10, 2), + numeric_col NUMERIC(10, 2), + real_col REAL, + double_precision_col DOUBLE PRECISION, + date_col DATE, + time_col TIME, + timestamp_col TIMESTAMP, + interval_col INTERVAL, + bytea_col BYTEA, + uuid_col UUID, + xml_col XML, + json_col JSON, + jsonb_col JSONB, + point_col POINT, + line_col LINE, + lseg_col LSEG, + box_col BOX, + path_col PATH, + polygon_col POLYGON, + circle_col CIRCLE +); +``` + +-- array column types: +``` +CREATE TABLE struct_it_pg2pg_2.array_table ( + pk SERIAL, + int_array INT[], + bigint_array BIGINT[], + text_array TEXT[], + char_array CHAR(10) [], + varchar_array VARCHAR(10) [], + date_array DATE[], + numeric_array NUMERIC(10, 2) [], + varnumeric_array NUMERIC[3], + inet_array INET[], + cidr_array CIDR[], + macaddr_array MACADDR[], + tsrange_array TSRANGE[], + tstzrange_array TSTZRANGE[], + daterange_array DATERANGE[], + int4range_array INT4RANGE[], + numerange_array NUMRANGE[], + int8range_array INT8RANGE[], + uuid_array UUID[], + json_array json[], + jsonb_array jsonb[], + oid_array OID[], + PRIMARY KEY(pk) +); +``` + +-- all check types(without fk and exclude): +``` +CREATE TABLE struct_it_pg2pg_2.full_constraint_type ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + unique_col VARCHAR(255) UNIQUE, + not_null_col VARCHAR(255) NOT NULL, + check_col VARCHAR(255) CHECK (char_length(check_col) > 3) +); +``` + +-- all index types: +``` +CREATE TABLE struct_it_pg2pg_2.full_index_type ( + id SERIAL PRIMARY KEY, + unique_col VARCHAR(255) NOT NULL, + index_col VARCHAR(255), + fulltext_col TSVECTOR, + spatial_col POINT NOT NULL, + simple_index_col VARCHAR(255), + composite_index_col1 VARCHAR(255), + composite_index_col2 VARCHAR(255), + composite_index_col3 VARCHAR(255) +); +``` + +CREATE UNIQUE INDEX unique_index ON struct_it_pg2pg_2.full_index_type (unique_col); + +CREATE INDEX index_index ON struct_it_pg2pg_2.full_index_type (index_col); + +CREATE INDEX fulltext_index ON struct_it_pg2pg_2.full_index_type USING gin(fulltext_col); + +CREATE INDEX spatial_index ON struct_it_pg2pg_2.full_index_type USING gist(spatial_col); + +CREATE INDEX simple_index ON struct_it_pg2pg_2.full_index_type (simple_index_col); + +``` +CREATE INDEX composite_index ON struct_it_pg2pg_2.full_index_type ( + composite_index_col1, composite_index_col2, + composite_index_col3 +); +``` + +-- table comments: +COMMENT ON TABLE struct_it_pg2pg_2.full_column_type IS 'Comment on full_column_type.'; +COMMENT ON TABLE struct_it_pg2pg_2.full_index_type IS 'Comment on full_index_type.'; + +-- column comments: +COMMENT ON COLUMN struct_it_pg2pg_2.full_column_type.id IS 'Comment on full_column_type.id.'; +COMMENT ON COLUMN struct_it_pg2pg_2.full_index_type.id IS 'Comment on full_index_type.id.'; + +-- sequences + +-- case 1: sequeces created automatically when creating table +CREATE TABLE struct_it_pg2pg_2.sequence_test_1 (seq_1 SERIAL, seq_2 BIGSERIAL, seq_3 SMALLSERIAL); + +-- case 2: create independent sequences, then alter their owners +CREATE SEQUENCE struct_it_pg2pg_2.sequence_test_2_seq_1; +CREATE SEQUENCE struct_it_pg2pg_2.sequence_test_2_seq_2; +CREATE SEQUENCE struct_it_pg2pg_2.sequence_test_2_seq_3; + +CREATE TABLE struct_it_pg2pg_2.sequence_test_2 (seq_1 INTEGER, seq_2 BIGINT, seq_3 SMALLINT); + +-- in postgres, sequence must be in same schema as table it is linked to +-- actually, postgres allows mutiple sequences owned by the same table.column, here we just ignore +ALTER SEQUENCE struct_it_pg2pg_2.sequence_test_2_seq_1 OWNED BY struct_it_pg2pg_2.sequence_test_2.seq_1; +ALTER SEQUENCE struct_it_pg2pg_2.sequence_test_2_seq_2 OWNED BY struct_it_pg2pg_2.sequence_test_2.seq_2; +ALTER SEQUENCE struct_it_pg2pg_2.sequence_test_2_seq_3 OWNED BY struct_it_pg2pg_2.sequence_test_2.seq_3; + +-- case 3: create independent sequences, use them in column defaults without ownership +-- we should migrate these sequences +CREATE SEQUENCE struct_it_pg2pg_2.sequence_test_3_seq_2; +CREATE SEQUENCE struct_it_pg2pg_2."sequence_test_3_seq.\d@_3"; + +``` +CREATE TABLE struct_it_pg2pg_2.sequence_test_3 ( + seq_1 SERIAL, + seq_2 BIGINT DEFAULT nextval('struct_it_pg2pg_2.sequence_test_3_seq_2'), + seq_3 SMALLINT DEFAULT nextval('struct_it_pg2pg_2."sequence_test_3_seq.\d@_3"') +); +``` + +-- case 4: create independent sequences and never used by any tables +-- we should not migrate them +CREATE SEQUENCE struct_it_pg2pg_2.sequence_test_4_seq_1; + +-- test view filtered +CREATE VIEW struct_it_pg2pg_2.full_column_type_view AS SELECT * FROM struct_it_pg2pg_2.full_column_type; + +-- special character +``` +CREATE TABLE struct_it_pg2pg_2."special_character_$1#@*_table" ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + unique_col VARCHAR(255) UNIQUE, + not_null_col VARCHAR(255) NOT NULL, + check_col VARCHAR(255) CHECK (char_length(check_col) > 3) +); +``` + +drop schema if exists struct_it_pg2pg_3 CASCADE; + +create schema struct_it_pg2pg_3; + +-- all basic column types: +``` +CREATE TABLE struct_it_pg2pg_3.full_column_type ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + char_col CHAR(10), + text_col TEXT, + boolean_col BOOLEAN, + smallint_col SMALLINT, + integer_col INTEGER, + bigint_col BIGINT, + decimal_col DECIMAL(10, 2), + numeric_col NUMERIC(10, 2), + real_col REAL, + double_precision_col DOUBLE PRECISION, + date_col DATE, + time_col TIME, + timestamp_col TIMESTAMP, + interval_col INTERVAL, + bytea_col BYTEA, + uuid_col UUID, + xml_col XML, + json_col JSON, + jsonb_col JSONB, + point_col POINT, + line_col LINE, + lseg_col LSEG, + box_col BOX, + path_col PATH, + polygon_col POLYGON, + circle_col CIRCLE +); +``` + +-- array column types: +``` +CREATE TABLE struct_it_pg2pg_3.array_table ( + pk SERIAL, + int_array INT[], + bigint_array BIGINT[], + text_array TEXT[], + char_array CHAR(10) [], + varchar_array VARCHAR(10) [], + date_array DATE[], + numeric_array NUMERIC(10, 2) [], + varnumeric_array NUMERIC[3], + inet_array INET[], + cidr_array CIDR[], + macaddr_array MACADDR[], + tsrange_array TSRANGE[], + tstzrange_array TSTZRANGE[], + daterange_array DATERANGE[], + int4range_array INT4RANGE[], + numerange_array NUMRANGE[], + int8range_array INT8RANGE[], + uuid_array UUID[], + json_array json[], + jsonb_array jsonb[], + oid_array OID[], + PRIMARY KEY(pk) +); +``` + +-- all check types(without fk and exclude): +``` +CREATE TABLE struct_it_pg2pg_3.full_constraint_type ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + unique_col VARCHAR(255) UNIQUE, + not_null_col VARCHAR(255) NOT NULL, + check_col VARCHAR(255) CHECK (char_length(check_col) > 3) +); +``` + +-- all index types: +``` +CREATE TABLE struct_it_pg2pg_3.full_index_type ( + id SERIAL PRIMARY KEY, + unique_col VARCHAR(255) NOT NULL, + index_col VARCHAR(255), + fulltext_col TSVECTOR, + spatial_col POINT NOT NULL, + simple_index_col VARCHAR(255), + composite_index_col1 VARCHAR(255), + composite_index_col2 VARCHAR(255), + composite_index_col3 VARCHAR(255) +); +``` + +CREATE UNIQUE INDEX unique_index ON struct_it_pg2pg_3.full_index_type (unique_col); + +CREATE INDEX index_index ON struct_it_pg2pg_3.full_index_type (index_col); + +CREATE INDEX fulltext_index ON struct_it_pg2pg_3.full_index_type USING gin(fulltext_col); + +CREATE INDEX spatial_index ON struct_it_pg2pg_3.full_index_type USING gist(spatial_col); + +CREATE INDEX simple_index ON struct_it_pg2pg_3.full_index_type (simple_index_col); + +``` +CREATE INDEX composite_index ON struct_it_pg2pg_3.full_index_type ( + composite_index_col1, composite_index_col2, + composite_index_col3 +); +``` + +-- table comments: +COMMENT ON TABLE struct_it_pg2pg_3.full_column_type IS 'Comment on full_column_type.'; +COMMENT ON TABLE struct_it_pg2pg_3.full_index_type IS 'Comment on full_index_type.'; + +-- column comments: +COMMENT ON COLUMN struct_it_pg2pg_3.full_column_type.id IS 'Comment on full_column_type.id.'; +COMMENT ON COLUMN struct_it_pg2pg_3.full_index_type.id IS 'Comment on full_index_type.id.'; + +-- sequences + +-- case 1: sequeces created automatically when creating table +CREATE TABLE struct_it_pg2pg_3.sequence_test_1 (seq_1 SERIAL, seq_2 BIGSERIAL, seq_3 SMALLSERIAL); + +-- case 2: create independent sequences, then alter their owners +CREATE SEQUENCE struct_it_pg2pg_3.sequence_test_2_seq_1; +CREATE SEQUENCE struct_it_pg2pg_3.sequence_test_2_seq_2; +CREATE SEQUENCE struct_it_pg2pg_3.sequence_test_2_seq_3; + +CREATE TABLE struct_it_pg2pg_3.sequence_test_2 (seq_1 INTEGER, seq_2 BIGINT, seq_3 SMALLINT); + +-- in postgres, sequence must be in same schema as table it is linked to +-- actually, postgres allows mutiple sequences owned by the same table.column, here we just ignore +ALTER SEQUENCE struct_it_pg2pg_3.sequence_test_2_seq_1 OWNED BY struct_it_pg2pg_3.sequence_test_2.seq_1; +ALTER SEQUENCE struct_it_pg2pg_3.sequence_test_2_seq_2 OWNED BY struct_it_pg2pg_3.sequence_test_2.seq_2; +ALTER SEQUENCE struct_it_pg2pg_3.sequence_test_2_seq_3 OWNED BY struct_it_pg2pg_3.sequence_test_2.seq_3; + +-- case 3: create independent sequences, use them in column defaults without ownership +-- we should migrate these sequences +CREATE SEQUENCE struct_it_pg2pg_3.sequence_test_3_seq_2; +CREATE SEQUENCE struct_it_pg2pg_3."sequence_test_3_seq.\d@_3"; + +``` +CREATE TABLE struct_it_pg2pg_3.sequence_test_3 ( + seq_1 SERIAL, + seq_2 BIGINT DEFAULT nextval('struct_it_pg2pg_3.sequence_test_3_seq_2'), + seq_3 SMALLINT DEFAULT nextval('struct_it_pg2pg_3."sequence_test_3_seq.\d@_3"') +); +``` + +-- case 4: create independent sequences and never used by any tables +-- we should not migrate them +CREATE SEQUENCE struct_it_pg2pg_3.sequence_test_4_seq_1; + +-- test view filtered +CREATE VIEW struct_it_pg2pg_3.full_column_type_view AS SELECT * FROM struct_it_pg2pg_3.full_column_type; + +-- special character +``` +CREATE TABLE struct_it_pg2pg_3."special_character_$1#@*_table" ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + unique_col VARCHAR(255) UNIQUE, + not_null_col VARCHAR(255) NOT NULL, + check_col VARCHAR(255) CHECK (char_length(check_col) > 3) +); +``` diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini index ca60b1b7..6d70d670 100644 --- a/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini @@ -13,7 +13,7 @@ conflict_policy=interrupt # conflict_policy=ignore [filter] -do_dbs=struct_it_pg2pg_1 +do_dbs=struct_it_pg2pg_* ignore_dbs= do_tbs= ignore_tbs= From 059191dbc148d5aa6f72163679da27722cb8d570 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Tue, 16 Sep 2025 14:13:51 +0800 Subject: [PATCH 05/17] fix --- dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini index 6d70d670..116c89b1 100644 --- a/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini @@ -9,8 +9,8 @@ sink_type=struct db_type=pg batch_size=1 url={pg_sinker_url} -conflict_policy=interrupt -# conflict_policy=ignore +# conflict_policy=interrupt +conflict_policy=ignore [filter] do_dbs=struct_it_pg2pg_* @@ -18,7 +18,7 @@ ignore_dbs= do_tbs= ignore_tbs= do_events= -do_structures=database,table,constraint,sequence,comment,index +# do_structures=database,table,constraint,sequence,comment,index [router] db_map= From 90f70d5c70fd126d12102d5bb43632e000d7692f Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Thu, 18 Sep 2025 16:27:10 +0800 Subject: [PATCH 06/17] add task_parallel_size --- dt-common/src/config/extractor_config.rs | 2 - dt-common/src/config/runtime_config.rs | 3 + dt-common/src/config/task_config.rs | 5 +- dt-task/src/task_runner.rs | 366 ++++++++---------- .../struct/batch_test/task_config.ini | 2 + 5 files changed, 179 insertions(+), 199 deletions(-) diff --git a/dt-common/src/config/extractor_config.rs b/dt-common/src/config/extractor_config.rs index 80af8e28..710ddb11 100644 --- a/dt-common/src/config/extractor_config.rs +++ b/dt-common/src/config/extractor_config.rs @@ -9,7 +9,6 @@ pub enum ExtractorConfig { url: String, db: String, dbs: Vec, - batch_size: usize, }, PgStruct { @@ -17,7 +16,6 @@ pub enum ExtractorConfig { schema: String, schemas: Vec, do_global_structs: bool, - batch_size: usize, }, MysqlSnapshot { diff --git a/dt-common/src/config/runtime_config.rs b/dt-common/src/config/runtime_config.rs index 00fc20f5..0edfb94a 100644 --- a/dt-common/src/config/runtime_config.rs +++ b/dt-common/src/config/runtime_config.rs @@ -4,4 +4,7 @@ pub struct RuntimeConfig { pub log_dir: String, pub log4rs_file: String, pub tb_parallel_size: usize, + pub task_parallel_size: usize, + pub tb_batch_size: usize, + pub db_batch_size: usize, } diff --git a/dt-common/src/config/task_config.rs b/dt-common/src/config/task_config.rs index 5a3ca12a..226031dc 100644 --- a/dt-common/src/config/task_config.rs +++ b/dt-common/src/config/task_config.rs @@ -177,7 +177,6 @@ impl TaskConfig { url, db: String::new(), dbs: Vec::new(), - batch_size: loader.get_with_default(EXTRACTOR, BATCH_SIZE, 1), }, ExtractType::FoxlakeS3 => { @@ -237,7 +236,6 @@ impl TaskConfig { schema: String::new(), schemas: Vec::new(), do_global_structs: false, - batch_size: loader.get_with_default(EXTRACTOR, BATCH_SIZE, 1), }, _ => bail! { not_supported_err }, @@ -611,6 +609,9 @@ impl TaskConfig { "./log4rs.yaml".to_string(), ), tb_parallel_size: loader.get_with_default(RUNTIME, "tb_parallel_size", 1), + task_parallel_size: loader.get_with_default(RUNTIME, "task_parallel_size", 1), + tb_batch_size: loader.get_with_default(RUNTIME, "tb_batch_size", 1), + db_batch_size: loader.get_with_default(RUNTIME, "db_batch_size", 1), }) } diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index 8219eda6..8bb7baa6 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -57,6 +57,15 @@ use dt_pipeline::{ #[cfg(feature = "metrics")] use dt_common::monitor::prometheus_metrics::PrometheusMetrics; +#[derive(Clone)] +pub struct TaskContext<'a> { + pub id: String, + pub extractor_config: ExtractorConfig, + pub router: &'a RdbRouter, + pub snapshot_resumer: &'a SnapshotResumer, + pub cdc_resumer: &'a CdcResumer, +} + #[derive(Clone)] pub struct TaskRunner { config: TaskConfig, @@ -165,111 +174,9 @@ impl TaskRunner { snapshot_resumer: &SnapshotResumer, cdc_resumer: &CdcResumer, ) -> anyhow::Result<()> { - let db_type = &self.config.extractor_basic.db_type; - let filter = RdbFilter::from_config(&self.config.filter, db_type)?; - let task_type_option = build_task_type( - &self.config.extractor_basic.extract_type, - &self.config.sinker_basic.sink_type, - ); - - let mut pending_tbs = VecDeque::new(); - let schemas = TaskUtil::list_schemas(url, db_type) - .await? - .iter() - .filter(|schema| !filter.filter_schema(schema)) - .map(|s| s.to_owned()) - .collect::>(); - if let Some(task_type) = task_type_option { - let record_count = - TaskUtil::estimate_record_count(&task_type, url, db_type, &schemas, &filter) - .await?; - self.task_monitor - .add_no_window_metrics(TaskMetricsType::ExtractorPlanRecords, record_count); - } - let struct_batch_size = match &self.config.extractor { - ExtractorConfig::MysqlStruct { batch_size, .. } - | ExtractorConfig::PgStruct { batch_size, .. } => Some(*batch_size), - _ => None, - }; - if let Some(struct_batch_size) = struct_batch_size { - if struct_batch_size == 0 { - bail!("batch_size must be greater than 0") - } - let schema_chunks: Vec> = schemas - .chunks(struct_batch_size) - .map(|chunk| chunk.to_vec()) - .collect(); - let extractor_configs: Vec = match &self.config.extractor { - ExtractorConfig::MysqlStruct { - url, - db, - batch_size, - .. - } => { - let mut extractor_configs = Vec::new(); - for schema_chunk in schema_chunks.iter() { - let extractor_config = ExtractorConfig::MysqlStruct { - url: url.clone(), - db: db.clone(), - dbs: schema_chunk.clone(), - batch_size: *batch_size, - }; - extractor_configs.push(extractor_config); - } - extractor_configs - } - ExtractorConfig::PgStruct { - url, - schema, - batch_size, - .. - } => { - let mut extractor_configs = Vec::new(); - for (flag, schema_chunk) in schema_chunks.iter().enumerate() { - let extractor_config = ExtractorConfig::PgStruct { - url: url.clone(), - schema: schema.clone(), - schemas: schema_chunk.clone(), - do_global_structs: flag == 0, - batch_size: *batch_size, - }; - extractor_configs.push(extractor_config); - } - extractor_configs - } - _ => vec![], - }; - for extractor_config in &extractor_configs { - self.clone() - .start_single_task( - extractor_config, - router, - snapshot_resumer, - cdc_resumer, - true, - ) - .await?; - } - } else { - // TODO: Need to limit resources when starting tasks concurrently at schema level. - // Currently connection count, rate limit, buffer size, etc. are controlled at single task level, - // which in multi-task mode will amplify these resources by at least schema count times - for schema in schemas.iter() { - // find pending tables - let tbs = TaskUtil::list_tbs(url, schema, db_type).await?; - for tb in tbs.iter() { - if snapshot_resumer.check_finished(schema, tb) { - log_info!("schema: {}, tb: {}, already finished", schema, tb); - continue; - } - if filter.filter_event(schema, tb, &RowType::Insert) { - log_info!("schema: {}, tb: {}, insert events filtered", schema, tb); - continue; - } - pending_tbs.push_back((schema.to_owned(), tb.to_owned())); - } - } - } + let mut pending_tasks = self + .build_pending_tasks(url, router, snapshot_resumer, cdc_resumer) + .await?; // start a thread to flush global monitors let global_shut_down = Arc::new(AtomicBool::new(false)); @@ -289,24 +196,15 @@ impl TaskRunner { .await }); - // process all tables in parallel - let tb_parallel_size = self.config.runtime.tb_parallel_size; - let semaphore = Arc::new(tokio::sync::Semaphore::new(tb_parallel_size)); + let task_parallel_size = self.config.runtime.task_parallel_size; + let semaphore = Arc::new(tokio::sync::Semaphore::new(task_parallel_size)); let mut join_set: JoinSet<(String, anyhow::Result<()>)> = JoinSet::new(); // initialize the task pool to its maximum capacity - while join_set.len() < tb_parallel_size && !pending_tbs.is_empty() { - if let Some((schema, tb)) = pending_tbs.pop_front() { + while join_set.len() < task_parallel_size && !pending_tasks.is_empty() { + if let Some(task_context) = pending_tasks.pop_front() { self.clone() - .spawn_single_task( - &schema, - &tb, - router, - snapshot_resumer, - cdc_resumer, - &mut join_set, - &semaphore, - ) + .spawn_single_task(task_context, &mut join_set, &semaphore) .await?; } } @@ -315,17 +213,9 @@ impl TaskRunner { while let Some(result) = join_set.join_next().await { match result { Ok((_, Ok(()))) => { - if let Some((schema, tb)) = pending_tbs.pop_front() { + if let Some(task_context) = pending_tasks.pop_front() { self.clone() - .spawn_single_task( - &schema, - &tb, - router, - snapshot_resumer, - cdc_resumer, - &mut join_set, - &semaphore, - ) + .spawn_single_task(task_context, &mut join_set, &semaphore) .await?; } } @@ -345,81 +235,22 @@ impl TaskRunner { async fn spawn_single_task( self, - schema: &str, - tb: &str, - router: &RdbRouter, - snapshot_resumer: &SnapshotResumer, - cdc_resumer: &CdcResumer, + task_context: TaskContext<'_>, join_set: &mut JoinSet<(String, anyhow::Result<()>)>, semaphore: &Arc, ) -> anyhow::Result<()> { - let tb_extractor_config = match &self.config.extractor { - ExtractorConfig::MysqlSnapshot { - url, - sample_interval, - parallel_size, - batch_size, - .. - } => ExtractorConfig::MysqlSnapshot { - url: url.clone(), - db: schema.into(), - tb: tb.into(), - sample_interval: *sample_interval, - parallel_size: *parallel_size, - batch_size: *batch_size, - }, - - ExtractorConfig::PgSnapshot { - url, - sample_interval, - batch_size, - .. - } => ExtractorConfig::PgSnapshot { - url: url.clone(), - schema: schema.into(), - tb: tb.into(), - sample_interval: *sample_interval, - batch_size: *batch_size, - }, - - ExtractorConfig::MongoSnapshot { url, app_name, .. } => { - ExtractorConfig::MongoSnapshot { - url: url.clone(), - app_name: app_name.clone(), - db: schema.into(), - tb: tb.into(), - } - } - - ExtractorConfig::FoxlakeS3 { - url, - s3_config, - batch_size, - .. - } => ExtractorConfig::FoxlakeS3 { - url: url.into(), - schema: schema.into(), - tb: tb.into(), - s3_config: s3_config.clone(), - batch_size: *batch_size, - }, - - _ => { - bail! {Error::ConfigError("unsupported extractor config".into())}; - } - }; - - let single_task_id = format!("{}.{}", schema, tb); - let router = router.clone(); - let snapshot_resumer = snapshot_resumer.clone(); - let cdc_resumer = cdc_resumer.clone(); + let single_task_id = task_context.id; + let extractor_config = task_context.extractor_config; + let router = task_context.router.clone(); + let snapshot_resumer = task_context.snapshot_resumer.clone(); + let cdc_resumer = task_context.cdc_resumer.clone(); let semaphore = Arc::clone(semaphore); let me = self.clone(); join_set.spawn(async move { let _permit = semaphore.acquire().await.unwrap(); let res = me .start_single_task( - &tb_extractor_config, + &extractor_config, &router, &snapshot_resumer, &cdc_resumer, @@ -930,4 +761,149 @@ impl TaskRunner { } Ok(()) } + + async fn build_pending_tasks<'a>( + &self, + url: &str, + router: &'a RdbRouter, + snapshot_resumer: &'a SnapshotResumer, + cdc_resumer: &'a CdcResumer, + ) -> anyhow::Result>> { + let db_type = &self.config.extractor_basic.db_type; + let filter = RdbFilter::from_config(&self.config.filter, db_type)?; + let task_type_option = build_task_type( + &self.config.extractor_basic.extract_type, + &self.config.sinker_basic.sink_type, + ); + + let schemas = TaskUtil::list_schemas(url, db_type) + .await? + .iter() + .filter(|schema| !filter.filter_schema(schema)) + .map(|s| s.to_owned()) + .collect::>(); + if let Some(task_type) = task_type_option { + let record_count = + TaskUtil::estimate_record_count(&task_type, url, db_type, &schemas, &filter) + .await?; + self.task_monitor + .add_no_window_metrics(TaskMetricsType::ExtractorPlanRecords, record_count); + } + let mut pending_tasks = VecDeque::new(); + let db_batch_size = match &self.config.extractor { + ExtractorConfig::MysqlStruct { .. } | ExtractorConfig::PgStruct { .. } => { + Some(self.config.runtime.db_batch_size) + } + _ => None, + }; + if let Some(db_batch_size) = db_batch_size { + let schema_chunks: Vec> = schemas + .chunks(db_batch_size) + .map(|chunk| chunk.to_vec()) + .collect(); + for (flag, schema_chunk) in schema_chunks.iter().enumerate() { + let db_extractor_config = match &self.config.extractor { + ExtractorConfig::MysqlStruct { url, db, .. } => ExtractorConfig::MysqlStruct { + url: url.clone(), + db: db.clone(), + dbs: schema_chunk.clone(), + }, + ExtractorConfig::PgStruct { url, schema, .. } => ExtractorConfig::PgStruct { + url: url.clone(), + schema: schema.clone(), + schemas: schema_chunk.clone(), + do_global_structs: flag == 0, + }, + _ => { + bail! {Error::ConfigError("unsupported extractor config for `runtime.db_batch_size`".into())} + } + }; + pending_tasks.push_back(TaskContext { + extractor_config: db_extractor_config, + router: router, + snapshot_resumer: snapshot_resumer, + cdc_resumer: cdc_resumer, + id: schema_chunk.join(","), + }); + } + } else { + for schema in schemas.iter() { + // find pending tables + let tbs = TaskUtil::list_tbs(url, schema, db_type).await?; + for tb in tbs.iter() { + if snapshot_resumer.check_finished(schema, tb) { + log_info!("schema: {}, tb: {}, already finished", schema, tb); + continue; + } + if filter.filter_event(schema, tb, &RowType::Insert) { + log_info!("schema: {}, tb: {}, insert events filtered", schema, tb); + continue; + } + let tb_extractor_config = match &self.config.extractor { + ExtractorConfig::MysqlSnapshot { + url, + sample_interval, + parallel_size, + batch_size, + .. + } => ExtractorConfig::MysqlSnapshot { + url: url.clone(), + db: schema.clone(), + tb: tb.clone(), + sample_interval: *sample_interval, + parallel_size: *parallel_size, + batch_size: *batch_size, + }, + + ExtractorConfig::PgSnapshot { + url, + sample_interval, + batch_size, + .. + } => ExtractorConfig::PgSnapshot { + url: url.clone(), + schema: schema.clone(), + tb: tb.clone(), + sample_interval: *sample_interval, + batch_size: *batch_size, + }, + + ExtractorConfig::MongoSnapshot { url, app_name, .. } => { + ExtractorConfig::MongoSnapshot { + url: url.clone(), + app_name: app_name.clone(), + db: schema.clone(), + tb: tb.clone(), + } + } + + ExtractorConfig::FoxlakeS3 { + url, + s3_config, + batch_size, + .. + } => ExtractorConfig::FoxlakeS3 { + url: url.clone(), + schema: schema.clone(), + tb: tb.clone(), + s3_config: s3_config.clone(), + batch_size: *batch_size, + }, + + _ => { + bail! {Error::ConfigError("unsupported extractor config for `runtime.task_parallel_size`".into())}; + } + }; + pending_tasks.push_back(TaskContext { + extractor_config: tb_extractor_config, + router: router, + snapshot_resumer: snapshot_resumer, + cdc_resumer: cdc_resumer, + id: format!("{}.{}", schema, tb), + }); + } + } + } + Ok(pending_tasks) + } } diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini index 116c89b1..7b2612c9 100644 --- a/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini @@ -29,6 +29,8 @@ col_map= log_level=info log4rs_file=./log4rs.yaml log_dir=./logs +task_parallel_size=2 +db_batch_size=2 [parallelizer] parallel_type=serial From 4d692cd0a01d051fffaf2badf3ce5738440e5df2 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Fri, 19 Sep 2025 19:44:19 +0800 Subject: [PATCH 07/17] fix pg struct fetcher --- .../src/extractor/pg/pg_struct_extractor.rs | 27 +- .../src/meta_fetcher/pg/pg_struct_fetcher.rs | 357 +++++++++++++----- dt-connector/src/sinker/pg/pg_checker.rs | 26 +- dt-task/src/task_runner.rs | 3 + dt-task/src/task_util.rs | 5 + 5 files changed, 293 insertions(+), 125 deletions(-) diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index d300e581..40957187 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -29,10 +29,7 @@ impl Extractor for PgStructExtractor { "PgStructExtractor starts, schemas: {}", self.schemas.join(",") ); - for (flag, schema) in self.schemas.clone().iter().enumerate() { - self.extract_internal(schema, self.do_global_structs && flag == 0) - .await?; - } + self.extract_internal().await?; self.base_extractor.wait_task_finish().await } @@ -42,29 +39,27 @@ impl Extractor for PgStructExtractor { } impl PgStructExtractor { - pub async fn extract_internal( - &mut self, - schema: &String, - do_global_structs: bool, - ) -> anyhow::Result<()> { + pub async fn extract_internal(&mut self) -> anyhow::Result<()> { let mut pg_fetcher = PgStructFetcher { conn_pool: self.conn_pool.to_owned(), - schema: schema.clone(), + schema: self.schema.clone(), + schemas: self.schemas.iter().cloned().collect(), filter: Some(self.filter.to_owned()), }; - // schema - let schema_statement = pg_fetcher.get_create_schema_statement().await?; - self.push_dt_data(StructStatement::PgCreateSchema(schema_statement)) - .await?; + // schemas + for schema_statement in pg_fetcher.get_create_schema_statements("").await? { + self.push_dt_data(StructStatement::PgCreateSchema(schema_statement)) + .await?; + } // tables - for table_statement in pg_fetcher.get_create_table_statements("").await? { + for table_statement in pg_fetcher.get_create_table_statements("", "").await? { self.push_dt_data(StructStatement::PgCreateTable(table_statement)) .await?; } - if do_global_structs && !self.filter.filter_structure(&StructureType::Rbac) { + if self.do_global_structs && !self.filter.filter_structure(&StructureType::Rbac) { // do rbac init let rbac_statements = pg_fetcher.get_create_rbac_statements().await?; for statement in rbac_statements { diff --git a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs index daa532ab..766621c8 100644 --- a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs +++ b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs @@ -34,6 +34,7 @@ use super::pg_struct_check_fetcher::PgStructCheckFetcher; pub struct PgStructFetcher { pub conn_pool: Pool, pub schema: String, + pub schemas: HashSet, pub filter: Option, } @@ -43,35 +44,42 @@ enum ColType { } impl PgStructFetcher { - pub async fn get_create_schema_statement(&mut self) -> anyhow::Result { - let schema = self.get_schema().await?; - Ok(PgCreateSchemaStatement { schema }) + pub async fn get_create_schema_statements( + &mut self, + sch: &str, + ) -> anyhow::Result> { + let schemas = self.get_schemas(sch).await?; + Ok(schemas + .into_iter() + .map(|s| PgCreateSchemaStatement { schema: s }) + .collect()) } pub async fn get_create_table_statements( &mut self, + sch: &str, tb: &str, ) -> anyhow::Result> { let mut results = Vec::new(); - let tables = self.get_tables(tb).await?; - let mut sequences = self.get_sequences(tb).await?; - let mut sequence_owners = self.get_sequence_owners(tb).await?; - let mut constraints = self.get_constraints(tb).await?; - let mut indexes = self.get_indexes(tb).await?; - let mut column_comments = self.get_column_comments(tb).await?; - let mut table_comments = self.get_table_comments(tb).await?; + let tables = self.get_tables(sch, tb).await?; + let mut sequences = self.get_sequences(sch, tb).await?; + let mut sequence_owners = self.get_sequence_owners(sch, tb).await?; + let mut constraints = self.get_constraints(sch, tb).await?; + let mut indexes = self.get_indexes(sch, tb).await?; + let mut column_comments = self.get_column_comments(sch, tb).await?; + let mut table_comments = self.get_table_comments(sch, tb).await?; - for (table_name, table) in tables { + for (schema_table_name, table) in tables { let table_sequences = self.get_table_sequences(&table, &mut sequences).await?; let statement = PgCreateTableStatement { table, sequences: table_sequences, - sequence_owners: self.get_result(&mut sequence_owners, &table_name), - constraints: self.get_result(&mut constraints, &table_name), - indexes: self.get_result(&mut indexes, &table_name), - column_comments: self.get_result(&mut column_comments, &table_name), - table_comments: self.get_result(&mut table_comments, &table_name), + sequence_owners: self.get_result(&mut sequence_owners, &schema_table_name), + constraints: self.get_result(&mut constraints, &schema_table_name), + indexes: self.get_result(&mut indexes, &schema_table_name), + column_comments: self.get_result(&mut column_comments, &schema_table_name), + table_comments: self.get_result(&mut table_comments, &schema_table_name), }; results.push(statement); } @@ -91,34 +99,82 @@ impl PgStructFetcher { }]) } - async fn get_schema(&mut self) -> anyhow::Result { + async fn get_schemas(&mut self, sch: &str) -> anyhow::Result> { + let (sch_filter, target_schemas) = if !sch.is_empty() { + if !self.schemas.contains(sch) { + return Ok(Vec::new()); + } + ( + format!("schema_name = '{}'", sch), + HashSet::from([sch.to_string()]), + ) + } else if !self.schemas.is_empty() { + ( + format!("schema_name in ({})", self.get_schemas_str()), + self.schemas.clone(), + ) + } else { + return Ok(Vec::new()); + }; + let sql = format!( "SELECT schema_name FROM information_schema.schemata - WHERE schema_name='{}'", - self.schema + WHERE {}", + sch_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); + let mut schemas = HashSet::new(); if let Some(row) = rows.try_next().await? { let schema_name = Self::get_str_with_null(&row, "schema_name")?; - let schema = Schema { name: schema_name }; - return Ok(schema); + schemas.insert(schema_name); + } + let filtered_schemas: Vec = target_schemas + .iter() + .filter(|&s| !schemas.contains(s)) + .cloned() + .collect(); + if !filtered_schemas.is_empty() { + bail! {Error::StructError(format!( + "schemas: {} not found", + filtered_schemas.join(",") + ))} + } else { + Ok(schemas.into_iter().map(|s| Schema { name: s }).collect()) } - - bail! {Error::StructError(format!( - "schema: {} not found", - self.schema - ))} } - async fn get_sequences(&mut self, tb: &str) -> anyhow::Result>> { - let mut results: HashMap> = HashMap::new(); + async fn get_sequences( + &mut self, + sch: &str, + tb: &str, + ) -> anyhow::Result>> { + let mut results = HashMap::new(); - let tb_filter = if !tb.is_empty() { - format!("AND tab.relname = '{}'", tb) + let (sch_filter, tb_filter) = if !sch.is_empty() { + if !self.schemas.contains(sch) { + return Ok(results); + } + if !tb.is_empty() { + ( + format!("ns.nspname = '{}'", sch), + format!("obj.sequence_schema='{}' AND tab.relname = '{}'", sch, tb), + ) + } else { + ( + format!("ns.nspname = '{}'", sch), + format!("obj.sequence_schema = '{}'", sch), + ) + } + } else if !self.schemas.is_empty() { + let schemas_str = &self.get_schemas_str(); + ( + format!("ns.nspname in ({})", schemas_str), + format!("obj.sequence_schema in ({})", schemas_str), + ) } else { - String::new() + return Ok(results); }; let sql = format!( @@ -141,10 +197,10 @@ impl PgStructFetcher { ON (seq.oid = dep.objid) JOIN pg_class AS tab ON (dep.refobjid = tab.oid) - WHERE ns.nspname='{}' - AND obj.sequence_schema='{}' {} + WHERE {} + AND {} AND dep.deptype='a'", - &self.schema, &self.schema, tb_filter + sch_filter, tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -158,7 +214,7 @@ impl PgStructFetcher { let sequence = Sequence { sequence_name, database_name: Self::get_str_with_null(&row, "sequence_catalog")?, - schema_name: sequence_schema, + schema_name: sequence_schema.clone(), data_type: Self::get_str_with_null(&row, "data_type")?, start_value: row.get("start_value"), increment: row.get("increment"), @@ -166,7 +222,7 @@ impl PgStructFetcher { maximum_value: row.get("maximum_value"), cycle_option: Self::get_str_with_null(&row, "cycle_option")?, }; - self.push_to_results(&mut results, &table_name, sequence); + self.push_to_results(&mut results, &sequence_schema, &table_name, sequence); } Ok(results) @@ -175,6 +231,7 @@ impl PgStructFetcher { async fn get_independent_sequences( &mut self, sequence_names: &[String], + table_schema: &str, ) -> anyhow::Result> { let filter_names: Vec = sequence_names.iter().map(|i| format!("'{}'", i)).collect(); let filter = format!("AND sequence_name IN ({})", filter_names.join(",")); @@ -182,7 +239,7 @@ impl PgStructFetcher { "SELECT * FROM information_schema.sequences WHERE sequence_schema='{}' {}", - self.schema, filter + table_schema, filter ); let mut results = Vec::new(); @@ -208,9 +265,12 @@ impl PgStructFetcher { async fn get_table_sequences( &mut self, table: &Table, - sequences: &mut HashMap>, + sequences: &mut HashMap<(String, String), Vec>, ) -> anyhow::Result> { - let mut table_sequences = self.get_result(sequences, &table.table_name); + let mut table_sequences = self.get_result( + sequences, + &(table.schema_name.clone(), table.table_name.clone()), + ); let mut owned_sequence_names = HashSet::new(); for sequence in table_sequences.iter() { @@ -235,7 +295,7 @@ impl PgStructFetcher { } // sequence and table should be in the same schema, otherwise we don't support - if !schema.is_empty() && schema != self.schema { + if !schema.is_empty() && schema != table.schema_name { log_error!( "table: {}.{} is using sequence: {}.{} from a different schema", table.schema_name, @@ -263,7 +323,7 @@ impl PgStructFetcher { if !independent_sequence_names.is_empty() { let independent_squences = self - .get_independent_sequences(&independent_sequence_names) + .get_independent_sequences(&independent_sequence_names, &table.schema_name) .await?; table_sequences.extend_from_slice(&independent_squences); } @@ -273,14 +333,24 @@ impl PgStructFetcher { async fn get_sequence_owners( &mut self, + sch: &str, tb: &str, - ) -> anyhow::Result>> { + ) -> anyhow::Result>> { let mut results = HashMap::new(); - let tb_filter = if !tb.is_empty() { - format!("AND tab.relname = '{}'", tb) + let tb_filter = if !sch.is_empty() { + if !self.schemas.contains(sch) { + return Ok(results); + } + if !tb.is_empty() { + format!("ns.nspname='{}' AND tab.relname = '{}'", sch, tb) + } else { + format!("ns.nspname = '{}'", sch) + } + } else if !self.schemas.is_empty() { + format!("ns.nspname in ({})", self.get_schemas_str()) } else { - String::new() + return Ok(results); }; let sql = format!( @@ -299,8 +369,8 @@ impl PgStructFetcher { ON (attr.attnum = dep.refobjsubid AND attr.attrelid = dep.refobjid) WHERE dep.deptype='a' AND seq.relkind='S' - AND ns.nspname = '{}' {}", - &self.schema, tb_filter + AND {}", + tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -315,23 +385,36 @@ impl PgStructFetcher { let sequence_owner = SequenceOwner { sequence_name: seq_name, database_name: String::new(), - schema_name, + schema_name: schema_name.clone(), table_name: table_name.clone(), column_name: Self::get_str_with_null(&row, "column_name")?, }; - self.push_to_results(&mut results, &table_name, sequence_owner); + self.push_to_results(&mut results, &schema_name, &table_name, sequence_owner); } Ok(results) } - async fn get_tables(&mut self, tb: &str) -> anyhow::Result> { - let mut results: BTreeMap = BTreeMap::new(); + async fn get_tables( + &mut self, + sch: &str, + tb: &str, + ) -> anyhow::Result> { + let mut results: BTreeMap<(String, String), Table> = BTreeMap::new(); - let tb_filter = if !tb.is_empty() { - format!("AND c.table_name = '{}'", tb) + let tb_filter = if !sch.is_empty() { + if !self.schemas.contains(sch) { + return Ok(results); + } + if !tb.is_empty() { + format!("c.table_schema = '{}' AND c.table_name = '{}'", sch, tb) + } else { + format!("c.table_schema = '{}'", sch) + } + } else if !self.schemas.is_empty() { + format!("c.table_schema in ({})", self.get_schemas_str()) } else { - String::new() + return Ok(results); }; let sql = format!( @@ -352,10 +435,10 @@ impl PgStructFetcher { JOIN information_schema.tables t ON c.table_schema = t.table_schema AND c.table_name = t.table_name - WHERE c.table_schema ='{}' {} + WHERE {} AND t.table_type = 'BASE TABLE' ORDER BY c.table_schema, c.table_name, c.ordinal_position", - &self.schema, tb_filter + tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -365,7 +448,8 @@ impl PgStructFetcher { Self::get_str_with_null(&row, "table_name")?, ); - if self.filter_tb(&self.schema.clone(), &table_name) { + if !self.schemas.contains(&table_schema) || !self.filter_tb(&table_schema, &table_name) + { continue; } @@ -386,11 +470,11 @@ impl PgStructFetcher { ..Default::default() }; - if let Some(table) = results.get_mut(&table_name) { + if let Some(table) = results.get_mut(&(table_schema.clone(), table_name.clone())) { table.columns.push(column); } else { results.insert( - table_name.clone(), + (table_schema.clone(), table_name.clone()), Table { database_name: table_schema.clone(), schema_name: table_schema, @@ -403,8 +487,8 @@ impl PgStructFetcher { } // get column types - for (table_name, table) in results.iter_mut() { - let column_types = self.get_column_types(table_name).await?; + for ((table_schema, table_name), table) in results.iter_mut() { + let column_types = self.get_column_types(table_schema, table_name).await?; for column in table.columns.iter_mut() { column.column_type = column_types.get(&column.column_name).unwrap().to_owned(); } @@ -413,17 +497,21 @@ impl PgStructFetcher { Ok(results) } - async fn get_column_types(&mut self, tb: &str) -> anyhow::Result> { + async fn get_column_types( + &mut self, + schema: &str, + tb: &str, + ) -> anyhow::Result> { let fetcher = PgStructCheckFetcher { conn_pool: self.conn_pool.clone(), }; - let oid = fetcher.get_oid(&self.schema, tb).await?; + let oid = fetcher.get_oid(schema, tb).await?; if oid.is_empty() { anyhow::bail!( "Invalid OID: cannot be empty for schema: {} and table: {}", - self.schema, + schema, tb ); } @@ -449,14 +537,24 @@ impl PgStructFetcher { async fn get_constraints( &mut self, + sch: &str, tb: &str, - ) -> anyhow::Result>> { + ) -> anyhow::Result>> { let mut results = HashMap::new(); - let tb_filter = if !tb.is_empty() { - format!("AND rel.relname = '{}'", tb) + let tb_filter = if !sch.is_empty() { + if !self.schemas.contains(sch) { + return Ok(results); + } + if !tb.is_empty() { + format!("nsp.nspname='{}' AND rel.relname = '{}'", sch, tb) + } else { + format!("nsp.nspname = '{}'", sch) + } + } else if !self.schemas.is_empty() { + format!("nsp.nspname in ({})", self.get_schemas_str()) } else { - String::new() + return Ok(results); }; let sql = format!( @@ -470,9 +568,9 @@ impl PgStructFetcher { ON rel.oid = con.conrelid JOIN pg_catalog.pg_namespace nsp ON nsp.oid = connamespace - WHERE nsp.nspname ='{}' {} + WHERE {} ORDER BY nsp.nspname,rel.relname", - &self.schema, tb_filter + tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -488,19 +586,43 @@ impl PgStructFetcher { constraint_type: ConstraintType::from_str(&constraint_type, DbType::Pg), definition: Self::get_str_with_null(&row, "constraint_definition")?, }; - self.push_to_results(&mut results, &table_name, constraint); + self.push_to_results( + &mut results, + &constraint.schema_name.clone(), + &table_name, + constraint, + ); } Ok(results) } - async fn get_indexes(&mut self, tb: &str) -> anyhow::Result>> { + async fn get_indexes( + &mut self, + sch: &str, + tb: &str, + ) -> anyhow::Result>> { let mut results = HashMap::new(); - let tb_filter = if !tb.is_empty() { - format!("AND tablename = '{}'", tb) + let tb_filter = if !sch.is_empty() { + if !self.schemas.contains(sch) { + return Ok(results); + } + if !tb.is_empty() { + format!("schemaname='{}' AND tablename = '{}'", sch, tb) + } else { + format!("schemaname = '{}'", sch) + } + } else if !self.schemas.is_empty() { + let schemas_str = self + .schemas + .iter() + .map(|s| format!("'{}'", s)) + .collect::>() + .join(","); + format!("schemaname in ({})", schemas_str) } else { - String::new() + return Ok(results); }; let sql = format!( @@ -509,8 +631,8 @@ impl PgStructFetcher { indexdef, COALESCE(tablespace, 'pg_default') AS tablespace, indexname FROM pg_indexes - WHERE schemaname = '{}' {}", - &self.schema, tb_filter + WHERE {}", + tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -527,7 +649,7 @@ impl PgStructFetcher { definition, ..Default::default() }; - self.push_to_results(&mut results, &table_name, index); + self.push_to_results(&mut results, &index.schema_name.clone(), &table_name, index); } Ok(results) @@ -535,14 +657,24 @@ impl PgStructFetcher { async fn get_table_comments( &mut self, + sch: &str, tb: &str, - ) -> anyhow::Result>> { + ) -> anyhow::Result>> { let mut results = HashMap::new(); - let tb_filter = if !tb.is_empty() { - format!("AND c.relname = '{}'", tb) + let tb_filter = if !sch.is_empty() { + if !self.schemas.contains(sch) { + return Ok(results); + } + if !tb.is_empty() { + format!("n.nspname='{}' AND c.relname = '{}'", sch, tb) + } else { + format!("n.nspname = '{}'", sch) + } + } else if !self.schemas.is_empty() { + format!("n.nspname in ({})", self.get_schemas_str()) } else { - String::new() + return Ok(results); }; let sql = format!( @@ -554,9 +686,9 @@ impl PgStructFetcher { ON n.oid = c.relnamespace LEFT JOIN pg_description d ON c.oid = d.objoid AND d.objsubid = 0 - WHERE n.nspname ='{}' {} + WHERE {} AND d.description IS NOT null", - &self.schema, tb_filter + tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -569,12 +701,12 @@ impl PgStructFetcher { let comment = Comment { comment_type: CommentType::Table, database_name: String::new(), - schema_name, + schema_name: schema_name.clone(), table_name: table_name.clone(), column_name: String::new(), comment: Self::get_str_with_null(&row, "description")?, }; - self.push_to_results(&mut results, &table_name, comment); + self.push_to_results(&mut results, &schema_name, &table_name, comment); } Ok(results) @@ -582,14 +714,24 @@ impl PgStructFetcher { async fn get_column_comments( &mut self, + sch: &str, tb: &str, - ) -> anyhow::Result>> { + ) -> anyhow::Result>> { let mut results = HashMap::new(); - let tb_filter = if !tb.is_empty() { - format!("AND c.relname = '{}'", tb) + let tb_filter = if !sch.is_empty() { + if !self.schemas.contains(sch) { + return Ok(results); + } + if !tb.is_empty() { + format!("n.nspname='{}' AND c.relname = '{}'", sch, tb) + } else { + format!("n.nspname = '{}'", sch) + } + } else if !self.schemas.is_empty() { + format!("n.nspname in ({})", self.get_schemas_str()) } else { - String::new() + return Ok(results); }; let sql = format!( @@ -604,10 +746,10 @@ impl PgStructFetcher { ON a.attrelid =c.oid LEFT JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname ='{}' {} + WHERE {} AND a.attnum >0 AND col_description(a.attrelid, a.attnum) is NOT null", - &self.schema, tb_filter + tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -621,12 +763,12 @@ impl PgStructFetcher { let comment = Comment { comment_type: CommentType::Column, database_name: String::new(), - schema_name, + schema_name: schema_name.clone(), table_name: table_name.clone(), column_name, comment: Self::get_str_with_null(&row, "comment")?, }; - self.push_to_results(&mut results, &table_name, comment); + self.push_to_results(&mut results, &schema_name, &table_name, comment); } Ok(results) @@ -892,7 +1034,7 @@ impl PgStructFetcher { async fn get_sequence_privilege(&mut self) -> anyhow::Result> { let mut results = Vec::new(); - let tables = self.get_tables("").await?; + let tables = self.get_tables("", "").await?; let mut sequence_map: HashMap> = HashMap::new(); for table in tables.values() { for column in &table.columns { @@ -1068,23 +1210,36 @@ impl PgStructFetcher { fn push_to_results( &mut self, - results: &mut HashMap>, + results: &mut HashMap<(String, String), Vec>, + schema_name: &str, table_name: &str, item: T, ) { - if self.filter_tb(&self.schema.clone(), table_name) { + if !self.schemas.contains(schema_name) || self.filter_tb(schema_name, table_name) { return; } - if let Some(exists) = results.get_mut(table_name) { + if let Some(exists) = results.get_mut(&(schema_name.to_owned(), table_name.to_owned())) { exists.push(item); } else { - results.insert(table_name.into(), vec![item]); + results.insert((schema_name.into(), table_name.into()), vec![item]); } } - fn get_result(&self, results: &mut HashMap>, table_name: &str) -> Vec { - results.remove(table_name).unwrap_or_default() + fn get_result( + &self, + results: &mut HashMap<(String, String), Vec>, + key: &(String, String), + ) -> Vec { + results.remove(key).unwrap_or_default() + } + + fn get_schemas_str(&self) -> String { + self.schemas + .iter() + .map(|s| format!("'{}'", s)) + .collect::>() + .join(",") } } diff --git a/dt-connector/src/sinker/pg/pg_checker.rs b/dt-connector/src/sinker/pg/pg_checker.rs index bbaae645..765f2609 100644 --- a/dt-connector/src/sinker/pg/pg_checker.rs +++ b/dt-connector/src/sinker/pg/pg_checker.rs @@ -1,4 +1,8 @@ -use std::{cmp, collections::HashMap, sync::Arc}; +use std::{ + cmp, + collections::{HashMap, HashSet}, + sync::Arc, +}; use async_trait::async_trait; use futures::TryStreamExt; @@ -168,24 +172,30 @@ impl PgChecker { let mut struct_fetcher = PgStructFetcher { conn_pool: self.conn_pool.to_owned(), - schema, + schema: schema.clone(), + schemas: HashSet::from([schema.clone()]), filter: None, }; let mut dst_statement = match &src_statement { StructStatement::PgCreateSchema(_) => { - let dst_statement = struct_fetcher.get_create_schema_statement().await?; - StructStatement::PgCreateSchema(dst_statement) + let mut dst_statements = + struct_fetcher.get_create_schema_statements(&schema).await?; + if dst_statements.is_empty() { + StructStatement::Unknown + } else { + StructStatement::PgCreateSchema(dst_statements.remove(0)) + } } StructStatement::PgCreateTable(statement) => { - let mut dst_statement = struct_fetcher - .get_create_table_statements(&statement.table.table_name) + let mut dst_statements = struct_fetcher + .get_create_table_statements(&schema, &statement.table.table_name) .await?; - if dst_statement.is_empty() { + if dst_statements.is_empty() { StructStatement::Unknown } else { - StructStatement::PgCreateTable(dst_statement.remove(0)) + StructStatement::PgCreateTable(dst_statements.remove(0)) } } diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index 8bb7baa6..060f0240 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -797,6 +797,9 @@ impl TaskRunner { _ => None, }; if let Some(db_batch_size) = db_batch_size { + if !TaskUtil::is_valid_db_batch_size(db_type, db_batch_size) { + bail! {Error::ConfigError(format!(r#"unsupported db_batch_size {} for {}"#, db_batch_size, db_type))} + } let schema_chunks: Vec> = schemas .chunks(db_batch_size) .map(|chunk| chunk.to_vec()) diff --git a/dt-task/src/task_util.rs b/dt-task/src/task_util.rs index 71af1899..7b6dccea 100644 --- a/dt-task/src/task_util.rs +++ b/dt-task/src/task_util.rs @@ -486,4 +486,9 @@ WHERE S3Client::new_with(rusoto_core::HttpClient::new().unwrap(), credentials, region) } + + pub fn is_valid_db_batch_size(_db_type: &DbType, batch_size: usize) -> bool { + // TODO: add check for different db types + batch_size > 0 && batch_size <= 50 + } } From 8c495070cfda6106a175e4de8033217ee094f7ae Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Mon, 22 Sep 2025 14:02:15 +0800 Subject: [PATCH 08/17] fix test --- .../src/meta_fetcher/pg/pg_struct_fetcher.rs | 23 ++++++++-------- dt-task/src/task_runner.rs | 26 ++++++++++++++++--- dt-task/src/task_util.rs | 10 +++++-- .../dst_clean.sql | 0 .../dst_prepare.sql | 0 .../src_clean.sql | 0 .../src_prepare.sql | 0 .../task_config.ini | 0 dt-tests/tests/pg_to_pg/struct_tests.rs | 4 +-- 9 files changed, 43 insertions(+), 20 deletions(-) rename dt-tests/tests/pg_to_pg/struct/{batch_test => parallel_test}/dst_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test => parallel_test}/dst_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test => parallel_test}/src_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test => parallel_test}/src_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test => parallel_test}/task_config.ini (100%) diff --git a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs index 766621c8..17a4710d 100644 --- a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs +++ b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs @@ -110,7 +110,7 @@ impl PgStructFetcher { ) } else if !self.schemas.is_empty() { ( - format!("schema_name in ({})", self.get_schemas_str()), + format!("schema_name IN ({})", self.get_schemas_str()), self.schemas.clone(), ) } else { @@ -126,7 +126,7 @@ impl PgStructFetcher { let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); let mut schemas = HashSet::new(); - if let Some(row) = rows.try_next().await? { + while let Some(row) = rows.try_next().await? { let schema_name = Self::get_str_with_null(&row, "schema_name")?; schemas.insert(schema_name); } @@ -170,8 +170,8 @@ impl PgStructFetcher { } else if !self.schemas.is_empty() { let schemas_str = &self.get_schemas_str(); ( - format!("ns.nspname in ({})", schemas_str), - format!("obj.sequence_schema in ({})", schemas_str), + format!("ns.nspname IN ({})", schemas_str), + format!("obj.sequence_schema IN ({})", schemas_str), ) } else { return Ok(results); @@ -348,7 +348,7 @@ impl PgStructFetcher { format!("ns.nspname = '{}'", sch) } } else if !self.schemas.is_empty() { - format!("ns.nspname in ({})", self.get_schemas_str()) + format!("ns.nspname IN ({})", self.get_schemas_str()) } else { return Ok(results); }; @@ -412,7 +412,7 @@ impl PgStructFetcher { format!("c.table_schema = '{}'", sch) } } else if !self.schemas.is_empty() { - format!("c.table_schema in ({})", self.get_schemas_str()) + format!("c.table_schema IN ({})", self.get_schemas_str()) } else { return Ok(results); }; @@ -448,8 +448,7 @@ impl PgStructFetcher { Self::get_str_with_null(&row, "table_name")?, ); - if !self.schemas.contains(&table_schema) || !self.filter_tb(&table_schema, &table_name) - { + if !self.schemas.contains(&table_schema) || self.filter_tb(&table_schema, &table_name) { continue; } @@ -552,7 +551,7 @@ impl PgStructFetcher { format!("nsp.nspname = '{}'", sch) } } else if !self.schemas.is_empty() { - format!("nsp.nspname in ({})", self.get_schemas_str()) + format!("nsp.nspname IN ({})", self.get_schemas_str()) } else { return Ok(results); }; @@ -620,7 +619,7 @@ impl PgStructFetcher { .map(|s| format!("'{}'", s)) .collect::>() .join(","); - format!("schemaname in ({})", schemas_str) + format!("schemaname IN ({})", schemas_str) } else { return Ok(results); }; @@ -672,7 +671,7 @@ impl PgStructFetcher { format!("n.nspname = '{}'", sch) } } else if !self.schemas.is_empty() { - format!("n.nspname in ({})", self.get_schemas_str()) + format!("n.nspname IN ({})", self.get_schemas_str()) } else { return Ok(results); }; @@ -729,7 +728,7 @@ impl PgStructFetcher { format!("n.nspname = '{}'", sch) } } else if !self.schemas.is_empty() { - format!("n.nspname in ({})", self.get_schemas_str()) + format!("n.nspname IN ({})", self.get_schemas_str()) } else { return Ok(results); }; diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index 060f0240..ccd2369d 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -196,7 +196,7 @@ impl TaskRunner { .await }); - let task_parallel_size = self.config.runtime.task_parallel_size; + let task_parallel_size = self.get_task_parallel_size(); let semaphore = Arc::new(tokio::sync::Semaphore::new(task_parallel_size)); let mut join_set: JoinSet<(String, anyhow::Result<()>)> = JoinSet::new(); @@ -797,9 +797,7 @@ impl TaskRunner { _ => None, }; if let Some(db_batch_size) = db_batch_size { - if !TaskUtil::is_valid_db_batch_size(db_type, db_batch_size) { - bail! {Error::ConfigError(format!(r#"unsupported db_batch_size {} for {}"#, db_batch_size, db_type))} - } + TaskUtil::validate_db_batch_size(db_type, db_batch_size)?; let schema_chunks: Vec> = schemas .chunks(db_batch_size) .map(|chunk| chunk.to_vec()) @@ -909,4 +907,24 @@ impl TaskRunner { } Ok(pending_tasks) } + + fn get_task_parallel_size(&self) -> usize { + match &self.config.extractor { + ExtractorConfig::MysqlSnapshot { .. } + | ExtractorConfig::PgSnapshot { .. } + | ExtractorConfig::FoxlakeS3 { .. } + | ExtractorConfig::MongoSnapshot { .. } => { + if self.config.runtime.task_parallel_size >= 1 { + self.config.runtime.task_parallel_size + } else { + // compatible with legacy config + self.config.runtime.tb_parallel_size + } + } + ExtractorConfig::MysqlStruct { .. } | ExtractorConfig::PgStruct { .. } => { + self.config.runtime.task_parallel_size + } + _ => 1, + } + } } diff --git a/dt-task/src/task_util.rs b/dt-task/src/task_util.rs index 7b6dccea..fa1b6ef8 100644 --- a/dt-task/src/task_util.rs +++ b/dt-task/src/task_util.rs @@ -1,5 +1,6 @@ use std::{str::FromStr, time::Duration}; +use anyhow::bail; use dt_common::config::config_enums::TaskType; use dt_common::config::extractor_config::ExtractorConfig; use dt_common::config::s3_config::S3Config; @@ -7,6 +8,7 @@ use dt_common::config::{ config_enums::DbType, meta_center_config::MetaCenterConfig, sinker_config::SinkerConfig, task_config::TaskConfig, }; +use dt_common::error::Error; use dt_common::log_info; use dt_common::meta::mysql::mysql_dbengine_meta_center::MysqlDbEngineMetaCenter; use dt_common::meta::{ @@ -487,8 +489,12 @@ WHERE S3Client::new_with(rusoto_core::HttpClient::new().unwrap(), credentials, region) } - pub fn is_valid_db_batch_size(_db_type: &DbType, batch_size: usize) -> bool { + pub fn validate_db_batch_size(_db_type: &DbType, db_batch_size: usize) -> anyhow::Result<()> { // TODO: add check for different db types - batch_size > 0 && batch_size <= 50 + if db_batch_size > 0 && db_batch_size <= 50 { + Ok(()) + } else { + bail! {Error::ConfigError(format!(r#"db_batch_size {} is not valid, should be in range (0, 50]"#, db_batch_size))} + } } } diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql b/dt-tests/tests/pg_to_pg/struct/parallel_test/dst_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql rename to dt-tests/tests/pg_to_pg/struct/parallel_test/dst_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql b/dt-tests/tests/pg_to_pg/struct/parallel_test/dst_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/parallel_test/dst_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql b/dt-tests/tests/pg_to_pg/struct/parallel_test/src_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql rename to dt-tests/tests/pg_to_pg/struct/parallel_test/src_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/parallel_test/src_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/parallel_test/src_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini b/dt-tests/tests/pg_to_pg/struct/parallel_test/task_config.ini similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini rename to dt-tests/tests/pg_to_pg/struct/parallel_test/task_config.ini diff --git a/dt-tests/tests/pg_to_pg/struct_tests.rs b/dt-tests/tests/pg_to_pg/struct_tests.rs index 560b005f..c9db988b 100644 --- a/dt-tests/tests/pg_to_pg/struct_tests.rs +++ b/dt-tests/tests/pg_to_pg/struct_tests.rs @@ -53,7 +53,7 @@ mod test { #[tokio::test] #[serial] - async fn struct_batch_test() { - TestBase::run_pg_struct_test("pg_to_pg/struct/batch_test").await; + async fn struct_parallel_test() { + TestBase::run_pg_struct_test("pg_to_pg/struct/parallel_test").await; } } From 1ed817e4242c3229635e0d3a984955ba212dfefd Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Tue, 23 Sep 2025 15:39:31 +0800 Subject: [PATCH 09/17] into single task --- dt-common/src/config/extractor_config.rs | 2 + dt-common/src/config/runtime_config.rs | 3 - dt-common/src/config/task_config.rs | 5 +- .../extractor/mysql/mysql_struct_extractor.rs | 1 + .../src/extractor/pg/pg_struct_extractor.rs | 42 ++++-- .../src/meta_fetcher/pg/pg_struct_fetcher.rs | 1 - dt-connector/src/rdb_router.rs | 1 - dt-connector/src/sinker/pg/pg_checker.rs | 1 - dt-task/src/extractor_util.rs | 14 +- dt-task/src/task_runner.rs | 140 ++++++++++-------- dt-task/src/task_util.rs | 11 -- .../dst_clean.sql | 0 .../dst_prepare.sql | 0 .../src_clean.sql | 0 .../src_prepare.sql | 0 .../task_config.ini | 8 +- dt-tests/tests/pg_to_pg/struct_tests.rs | 4 +- 17 files changed, 131 insertions(+), 102 deletions(-) rename dt-tests/tests/pg_to_pg/struct/{parallel_test => batch_test}/dst_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{parallel_test => batch_test}/dst_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{parallel_test => batch_test}/src_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{parallel_test => batch_test}/src_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{parallel_test => batch_test}/task_config.ini (85%) diff --git a/dt-common/src/config/extractor_config.rs b/dt-common/src/config/extractor_config.rs index 710ddb11..2f7d36a0 100644 --- a/dt-common/src/config/extractor_config.rs +++ b/dt-common/src/config/extractor_config.rs @@ -9,6 +9,7 @@ pub enum ExtractorConfig { url: String, db: String, dbs: Vec, + db_batch_size: usize, }, PgStruct { @@ -16,6 +17,7 @@ pub enum ExtractorConfig { schema: String, schemas: Vec, do_global_structs: bool, + db_batch_size: usize, }, MysqlSnapshot { diff --git a/dt-common/src/config/runtime_config.rs b/dt-common/src/config/runtime_config.rs index 0edfb94a..00fc20f5 100644 --- a/dt-common/src/config/runtime_config.rs +++ b/dt-common/src/config/runtime_config.rs @@ -4,7 +4,4 @@ pub struct RuntimeConfig { pub log_dir: String, pub log4rs_file: String, pub tb_parallel_size: usize, - pub task_parallel_size: usize, - pub tb_batch_size: usize, - pub db_batch_size: usize, } diff --git a/dt-common/src/config/task_config.rs b/dt-common/src/config/task_config.rs index 226031dc..19f0a196 100644 --- a/dt-common/src/config/task_config.rs +++ b/dt-common/src/config/task_config.rs @@ -177,6 +177,7 @@ impl TaskConfig { url, db: String::new(), dbs: Vec::new(), + db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1000), }, ExtractType::FoxlakeS3 => { @@ -236,6 +237,7 @@ impl TaskConfig { schema: String::new(), schemas: Vec::new(), do_global_structs: false, + db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 100), }, _ => bail! { not_supported_err }, @@ -609,9 +611,6 @@ impl TaskConfig { "./log4rs.yaml".to_string(), ), tb_parallel_size: loader.get_with_default(RUNTIME, "tb_parallel_size", 1), - task_parallel_size: loader.get_with_default(RUNTIME, "task_parallel_size", 1), - tb_batch_size: loader.get_with_default(RUNTIME, "tb_batch_size", 1), - db_batch_size: loader.get_with_default(RUNTIME, "db_batch_size", 1), }) } diff --git a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs index be38d6e9..28d1399f 100644 --- a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs +++ b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs @@ -20,6 +20,7 @@ pub struct MysqlStructExtractor { pub db: String, pub dbs: Vec, pub filter: RdbFilter, + pub db_batch_size: usize, } #[async_trait] diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index 40957187..ffd8c447 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -1,4 +1,8 @@ +use std::collections::HashSet; + +use anyhow::bail; use async_trait::async_trait; +use dt_common::error::Error; use dt_common::meta::struct_meta::struct_data::StructData; use dt_common::{log_info, rdb_filter::RdbFilter}; @@ -16,20 +20,29 @@ use crate::{ pub struct PgStructExtractor { pub base_extractor: BaseExtractor, pub conn_pool: Pool, - pub schema: String, pub schemas: Vec, pub do_global_structs: bool, pub filter: RdbFilter, + pub db_batch_size: usize, } #[async_trait] impl Extractor for PgStructExtractor { async fn extract(&mut self) -> anyhow::Result<()> { - log_info!( - "PgStructExtractor starts, schemas: {}", - self.schemas.join(",") - ); - self.extract_internal().await?; + log_info!("PgStructExtractor starts..."); + let schema_chunks: Vec> = self + .schemas + .chunks(self.db_batch_size) + .map(|chunk| chunk.to_vec()) + .collect(); + for schema_chunk in schema_chunks { + log_info!( + "PgStructExtractor extracts schemas: {}", + schema_chunk.join(",") + ); + self.extract_internal(schema_chunk.into_iter().collect()) + .await?; + } self.base_extractor.wait_task_finish().await } @@ -39,11 +52,10 @@ impl Extractor for PgStructExtractor { } impl PgStructExtractor { - pub async fn extract_internal(&mut self) -> anyhow::Result<()> { + pub async fn extract_internal(&mut self, schemas: HashSet) -> anyhow::Result<()> { let mut pg_fetcher = PgStructFetcher { conn_pool: self.conn_pool.to_owned(), - schema: self.schema.clone(), - schemas: self.schemas.iter().cloned().collect(), + schemas: schemas, filter: Some(self.filter.to_owned()), }; @@ -73,9 +85,19 @@ impl PgStructExtractor { pub async fn push_dt_data(&mut self, statement: StructStatement) -> anyhow::Result<()> { let struct_data = StructData { - schema: self.schema.clone(), + schema: "".to_string(), statement, }; self.base_extractor.push_struct(struct_data).await } + + pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result<()> { + let max_db_batch_size = 100; + let min_db_batch_size = 1; + if db_batch_size < min_db_batch_size || db_batch_size > max_db_batch_size { + bail! {Error::ConfigError(format!(r#"db_batch_size {} is not valid, should be in range ({}, {})"#, db_batch_size, min_db_batch_size, max_db_batch_size))} + } else { + Ok(()) + } + } } diff --git a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs index 17a4710d..07330975 100644 --- a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs +++ b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs @@ -33,7 +33,6 @@ use super::pg_struct_check_fetcher::PgStructCheckFetcher; pub struct PgStructFetcher { pub conn_pool: Pool, - pub schema: String, pub schemas: HashSet, pub filter: Option, } diff --git a/dt-connector/src/rdb_router.rs b/dt-connector/src/rdb_router.rs index b36e7ca8..40dd8a7d 100644 --- a/dt-connector/src/rdb_router.rs +++ b/dt-connector/src/rdb_router.rs @@ -213,7 +213,6 @@ impl RdbRouter { _ => {} } - struct_data.schema = self.get_schema_map(&struct_data.schema).into(); struct_data } diff --git a/dt-connector/src/sinker/pg/pg_checker.rs b/dt-connector/src/sinker/pg/pg_checker.rs index 765f2609..0875b6e6 100644 --- a/dt-connector/src/sinker/pg/pg_checker.rs +++ b/dt-connector/src/sinker/pg/pg_checker.rs @@ -172,7 +172,6 @@ impl PgChecker { let mut struct_fetcher = PgStructFetcher { conn_pool: self.conn_pool.to_owned(), - schema: schema.clone(), schemas: HashSet::from([schema.clone()]), filter: None, }; diff --git a/dt-task/src/extractor_util.rs b/dt-task/src/extractor_util.rs index a2aa199d..25146c6b 100644 --- a/dt-task/src/extractor_util.rs +++ b/dt-task/src/extractor_util.rs @@ -332,7 +332,13 @@ impl ExtractorUtil { Box::new(extractor) } - ExtractorConfig::MysqlStruct { url, db, dbs, .. } => { + ExtractorConfig::MysqlStruct { + url, + db, + dbs, + db_batch_size, + .. + } => { // TODO, pass max_connections as parameter let conn_pool = TaskUtil::create_mysql_conn_pool(&url, 2, enable_sqlx_log, false).await?; @@ -342,27 +348,29 @@ impl ExtractorUtil { dbs, filter, base_extractor, + db_batch_size, }; Box::new(extractor) } ExtractorConfig::PgStruct { url, - schema, schemas, do_global_structs, + db_batch_size, .. } => { + PgStructExtractor::validate_db_batch_size(db_batch_size)?; // TODO, pass max_connections as parameter let conn_pool = TaskUtil::create_pg_conn_pool(&url, 2, enable_sqlx_log, false).await?; let extractor = PgStructExtractor { conn_pool, - schema, schemas, do_global_structs, filter, base_extractor, + db_batch_size, }; Box::new(extractor) } diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index ccd2369d..b5826878 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -140,9 +140,26 @@ impl TaskRunner { .await; match &self.config.extractor { - ExtractorConfig::MysqlStruct { url, .. } - | ExtractorConfig::PgStruct { url, .. } - | ExtractorConfig::MysqlSnapshot { url, .. } + ExtractorConfig::MysqlStruct { url, .. } | ExtractorConfig::PgStruct { url, .. } => { + let mut pending_task = self + .build_pending_tasks(url, &router, &snapshot_resumer, &cdc_resumer, false) + .await?; + if !pending_task.is_empty() { + if let Some(task_context) = pending_task.pop_front() { + self.clone() + .start_single_task( + &task_context.extractor_config, + &router, + &snapshot_resumer, + &cdc_resumer, + false, + ) + .await? + } + } + } + + ExtractorConfig::MysqlSnapshot { url, .. } | ExtractorConfig::PgSnapshot { url, .. } | ExtractorConfig::MongoSnapshot { url, .. } | ExtractorConfig::FoxlakeS3 { url, .. } => { @@ -175,7 +192,7 @@ impl TaskRunner { cdc_resumer: &CdcResumer, ) -> anyhow::Result<()> { let mut pending_tasks = self - .build_pending_tasks(url, router, snapshot_resumer, cdc_resumer) + .build_pending_tasks(url, router, snapshot_resumer, cdc_resumer, true) .await?; // start a thread to flush global monitors @@ -768,13 +785,10 @@ impl TaskRunner { router: &'a RdbRouter, snapshot_resumer: &'a SnapshotResumer, cdc_resumer: &'a CdcResumer, + is_multi_task: bool, ) -> anyhow::Result>> { let db_type = &self.config.extractor_basic.db_type; let filter = RdbFilter::from_config(&self.config.filter, db_type)?; - let task_type_option = build_task_type( - &self.config.extractor_basic.extract_type, - &self.config.sinker_basic.sink_type, - ); let schemas = TaskUtil::list_schemas(url, db_type) .await? @@ -782,51 +796,63 @@ impl TaskRunner { .filter(|schema| !filter.filter_schema(schema)) .map(|s| s.to_owned()) .collect::>(); - if let Some(task_type) = task_type_option { - let record_count = - TaskUtil::estimate_record_count(&task_type, url, db_type, &schemas, &filter) - .await?; - self.task_monitor - .add_no_window_metrics(TaskMetricsType::ExtractorPlanRecords, record_count); + + if is_multi_task { + let task_type_option = build_task_type( + &self.config.extractor_basic.extract_type, + &self.config.sinker_basic.sink_type, + ); + if let Some(task_type) = task_type_option { + let record_count = + TaskUtil::estimate_record_count(&task_type, url, db_type, &schemas, &filter) + .await?; + self.task_monitor + .add_no_window_metrics(TaskMetricsType::ExtractorPlanRecords, record_count); + } } + let mut pending_tasks = VecDeque::new(); - let db_batch_size = match &self.config.extractor { - ExtractorConfig::MysqlStruct { .. } | ExtractorConfig::PgStruct { .. } => { - Some(self.config.runtime.db_batch_size) - } - _ => None, + let is_db_extractor_config = match &self.config.extractor { + ExtractorConfig::MysqlStruct { .. } | ExtractorConfig::PgStruct { .. } => true, + _ => false, }; - if let Some(db_batch_size) = db_batch_size { - TaskUtil::validate_db_batch_size(db_type, db_batch_size)?; - let schema_chunks: Vec> = schemas - .chunks(db_batch_size) - .map(|chunk| chunk.to_vec()) - .collect(); - for (flag, schema_chunk) in schema_chunks.iter().enumerate() { - let db_extractor_config = match &self.config.extractor { - ExtractorConfig::MysqlStruct { url, db, .. } => ExtractorConfig::MysqlStruct { - url: url.clone(), - db: db.clone(), - dbs: schema_chunk.clone(), - }, - ExtractorConfig::PgStruct { url, schema, .. } => ExtractorConfig::PgStruct { - url: url.clone(), - schema: schema.clone(), - schemas: schema_chunk.clone(), - do_global_structs: flag == 0, - }, - _ => { - bail! {Error::ConfigError("unsupported extractor config for `runtime.db_batch_size`".into())} - } - }; - pending_tasks.push_back(TaskContext { - extractor_config: db_extractor_config, - router: router, - snapshot_resumer: snapshot_resumer, - cdc_resumer: cdc_resumer, - id: schema_chunk.join(","), - }); - } + if is_db_extractor_config { + let db_extractor_config = match &self.config.extractor { + ExtractorConfig::MysqlStruct { + url, + db, + db_batch_size, + .. + } => ExtractorConfig::MysqlStruct { + url: url.clone(), + db: db.clone(), + dbs: schemas.clone(), + db_batch_size: db_batch_size.clone(), + }, + ExtractorConfig::PgStruct { + url, + schema, + do_global_structs, + db_batch_size, + .. + } => ExtractorConfig::PgStruct { + url: url.clone(), + schema: schema.clone(), + schemas: schemas, + do_global_structs: do_global_structs.clone(), + db_batch_size: db_batch_size.clone(), + }, + _ => { + bail! {Error::ConfigError("unsupported extractor config type".into())} + } + }; + pending_tasks.push_back(TaskContext { + extractor_config: db_extractor_config, + router: router, + snapshot_resumer: snapshot_resumer, + cdc_resumer: cdc_resumer, + id: "".to_string(), + }); } else { for schema in schemas.iter() { // find pending tables @@ -892,7 +918,7 @@ impl TaskRunner { }, _ => { - bail! {Error::ConfigError("unsupported extractor config for `runtime.task_parallel_size`".into())}; + bail! {Error::ConfigError("unsupported extractor config for `runtime.tb_parallel_size`".into())}; } }; pending_tasks.push_back(TaskContext { @@ -913,17 +939,7 @@ impl TaskRunner { ExtractorConfig::MysqlSnapshot { .. } | ExtractorConfig::PgSnapshot { .. } | ExtractorConfig::FoxlakeS3 { .. } - | ExtractorConfig::MongoSnapshot { .. } => { - if self.config.runtime.task_parallel_size >= 1 { - self.config.runtime.task_parallel_size - } else { - // compatible with legacy config - self.config.runtime.tb_parallel_size - } - } - ExtractorConfig::MysqlStruct { .. } | ExtractorConfig::PgStruct { .. } => { - self.config.runtime.task_parallel_size - } + | ExtractorConfig::MongoSnapshot { .. } => self.config.runtime.tb_parallel_size, _ => 1, } } diff --git a/dt-task/src/task_util.rs b/dt-task/src/task_util.rs index fa1b6ef8..71af1899 100644 --- a/dt-task/src/task_util.rs +++ b/dt-task/src/task_util.rs @@ -1,6 +1,5 @@ use std::{str::FromStr, time::Duration}; -use anyhow::bail; use dt_common::config::config_enums::TaskType; use dt_common::config::extractor_config::ExtractorConfig; use dt_common::config::s3_config::S3Config; @@ -8,7 +7,6 @@ use dt_common::config::{ config_enums::DbType, meta_center_config::MetaCenterConfig, sinker_config::SinkerConfig, task_config::TaskConfig, }; -use dt_common::error::Error; use dt_common::log_info; use dt_common::meta::mysql::mysql_dbengine_meta_center::MysqlDbEngineMetaCenter; use dt_common::meta::{ @@ -488,13 +486,4 @@ WHERE S3Client::new_with(rusoto_core::HttpClient::new().unwrap(), credentials, region) } - - pub fn validate_db_batch_size(_db_type: &DbType, db_batch_size: usize) -> anyhow::Result<()> { - // TODO: add check for different db types - if db_batch_size > 0 && db_batch_size <= 50 { - Ok(()) - } else { - bail! {Error::ConfigError(format!(r#"db_batch_size {} is not valid, should be in range (0, 50]"#, db_batch_size))} - } - } } diff --git a/dt-tests/tests/pg_to_pg/struct/parallel_test/dst_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/parallel_test/dst_clean.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/parallel_test/dst_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/parallel_test/dst_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/parallel_test/src_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/parallel_test/src_clean.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/parallel_test/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/parallel_test/src_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/parallel_test/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini similarity index 85% rename from dt-tests/tests/pg_to_pg/struct/parallel_test/task_config.ini rename to dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini index 7b2612c9..036c436f 100644 --- a/dt-tests/tests/pg_to_pg/struct/parallel_test/task_config.ini +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini @@ -2,15 +2,15 @@ extract_type=struct db_type=pg url={pg_extractor_url} -batch_size=2 +db_batch_size=2 [sinker] sink_type=struct db_type=pg batch_size=1 url={pg_sinker_url} -# conflict_policy=interrupt -conflict_policy=ignore +conflict_policy=interrupt +# conflict_policy=ignore [filter] do_dbs=struct_it_pg2pg_* @@ -29,8 +29,6 @@ col_map= log_level=info log4rs_file=./log4rs.yaml log_dir=./logs -task_parallel_size=2 -db_batch_size=2 [parallelizer] parallel_type=serial diff --git a/dt-tests/tests/pg_to_pg/struct_tests.rs b/dt-tests/tests/pg_to_pg/struct_tests.rs index c9db988b..560b005f 100644 --- a/dt-tests/tests/pg_to_pg/struct_tests.rs +++ b/dt-tests/tests/pg_to_pg/struct_tests.rs @@ -53,7 +53,7 @@ mod test { #[tokio::test] #[serial] - async fn struct_parallel_test() { - TestBase::run_pg_struct_test("pg_to_pg/struct/parallel_test").await; + async fn struct_batch_test() { + TestBase::run_pg_struct_test("pg_to_pg/struct/batch_test").await; } } From a4a6d01933e8e697cbfab1a36ecb5c4b28d368bf Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Tue, 23 Sep 2025 20:02:13 +0800 Subject: [PATCH 10/17] add test --- .../src/extractor/pg/pg_struct_extractor.rs | 12 +- .../dst_clean.sql | 0 .../dst_prepare.sql | 0 .../src_clean.sql | 0 .../src_prepare.sql | 0 .../task_config.ini | 0 .../struct/batch_test_2/check/task_config.ini | 36 ++++ .../batch_test_2/src_to_dst/dst_clean.sql | 1 + .../batch_test_2/src_to_dst/dst_prepare.sql | 15 ++ .../batch_test_2/src_to_dst/src_clean.sql | 1 + .../batch_test_2/src_to_dst/src_prepare.sql | 158 ++++++++++++++++++ .../batch_test_2/src_to_dst/task_config.ini | 39 +++++ dt-tests/tests/pg_to_pg/struct_tests.rs | 14 +- 13 files changed, 270 insertions(+), 6 deletions(-) rename dt-tests/tests/pg_to_pg/struct/{batch_test => batch_test_1}/dst_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test => batch_test_1}/dst_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test => batch_test_1}/src_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test => batch_test_1}/src_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test => batch_test_1}/task_config.ini (100%) create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test_2/check/task_config.ini create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_clean.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_prepare.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_clean.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_prepare.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/task_config.ini diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index ffd8c447..aca1845a 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -35,12 +35,12 @@ impl Extractor for PgStructExtractor { .chunks(self.db_batch_size) .map(|chunk| chunk.to_vec()) .collect(); - for schema_chunk in schema_chunks { + for (flag, schema_chunk) in schema_chunks.into_iter().enumerate() { log_info!( "PgStructExtractor extracts schemas: {}", schema_chunk.join(",") ); - self.extract_internal(schema_chunk.into_iter().collect()) + self.extract_internal(schema_chunk.into_iter().collect(), flag == 0) .await?; } self.base_extractor.wait_task_finish().await @@ -52,7 +52,11 @@ impl Extractor for PgStructExtractor { } impl PgStructExtractor { - pub async fn extract_internal(&mut self, schemas: HashSet) -> anyhow::Result<()> { + pub async fn extract_internal( + &mut self, + schemas: HashSet, + do_global_structs: bool, + ) -> anyhow::Result<()> { let mut pg_fetcher = PgStructFetcher { conn_pool: self.conn_pool.to_owned(), schemas: schemas, @@ -71,7 +75,7 @@ impl PgStructExtractor { .await?; } - if self.do_global_structs && !self.filter.filter_structure(&StructureType::Rbac) { + if do_global_structs && !self.filter.filter_structure(&StructureType::Rbac) { // do rbac init let rbac_statements = pg_fetcher.get_create_rbac_statements().await?; for statement in rbac_statements { diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test_1/dst_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/dst_clean.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test_1/dst_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test_1/dst_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/dst_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test_1/dst_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test_1/src_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/src_clean.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test_1/src_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test_1/src_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/src_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test_1/src_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test_1/task_config.ini similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test/task_config.ini rename to dt-tests/tests/pg_to_pg/struct/batch_test_1/task_config.ini diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/check/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test_2/check/task_config.ini new file mode 100644 index 00000000..220a2620 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test_2/check/task_config.ini @@ -0,0 +1,36 @@ +[extractor] +db_type=pg +extract_type=struct +url={pg_extractor_url} +db_batch_size=10 + +[sinker] +db_type=pg +sink_type=check +url={pg_sinker_url} +batch_size=2 + +[filter] +do_dbs=struct_it_pg2pg_* +ignore_dbs= +do_tbs= +ignore_tbs= +do_events= + +[router] +db_map= +tb_map= +col_map= + +[parallelizer] +parallel_type=rdb_check +parallel_size=2 + +[pipeline] +buffer_size=4 +checkpoint_interval_secs=1 + +[runtime] +log_level=error +log4rs_file=./log4rs.yaml +log_dir=./logs \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_clean.sql new file mode 100644 index 00000000..e88cc948 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_clean.sql @@ -0,0 +1 @@ +-- drop schema if exists struct_it_pg2pg_1 CASCADE; \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_prepare.sql new file mode 100644 index 00000000..e771d177 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_prepare.sql @@ -0,0 +1,15 @@ +-- Use a DO block and a FOR loop to create the schemas and their objects dynamically. +``` +DO $$ +DECLARE + i INT; + schema_name TEXT; +BEGIN + FOR i IN 1..10 LOOP + schema_name := 'struct_it_pg2pg_' || i; + + -- Drop Schemas + EXECUTE format('DROP SCHEMA IF EXISTS %I CASCADE', schema_name); + END LOOP; +END $$; +``` diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_clean.sql new file mode 100644 index 00000000..e88cc948 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_clean.sql @@ -0,0 +1 @@ +-- drop schema if exists struct_it_pg2pg_1 CASCADE; \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_prepare.sql new file mode 100644 index 00000000..f037f8fa --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_prepare.sql @@ -0,0 +1,158 @@ +-- Use a DO block and a FOR loop to create the schemas and their objects dynamically. +``` +DO $$ +DECLARE + i INT; + schema_name TEXT; +BEGIN + + FOR i IN 1..10 LOOP + schema_name := 'struct_it_pg2pg_' || i; + + -- Drop and Create the Schema + EXECUTE format('DROP SCHEMA IF EXISTS %I CASCADE', schema_name); + EXECUTE format('CREATE SCHEMA %I', schema_name); + + -- all basic column types: + EXECUTE format(' + CREATE TABLE %I.full_column_type ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + char_col CHAR(10), + text_col TEXT, + boolean_col BOOLEAN, + smallint_col SMALLINT, + integer_col INTEGER, + bigint_col BIGINT, + decimal_col DECIMAL(10, 2), + numeric_col NUMERIC(10, 2), + real_col REAL, + double_precision_col DOUBLE PRECISION, + date_col DATE, + time_col TIME, + timestamp_col TIMESTAMP, + interval_col INTERVAL, + bytea_col BYTEA, + uuid_col UUID, + xml_col XML, + json_col JSON, + jsonb_col JSONB, + point_col POINT, + line_col LINE, + lseg_col LSEG, + box_col BOX, + path_col PATH, + polygon_col POLYGON, + circle_col CIRCLE + )', schema_name); + + -- array column types: + EXECUTE format(' + CREATE TABLE %I.array_table ( + pk SERIAL, + int_array INT[], + bigint_array BIGINT[], + text_array TEXT[], + char_array CHAR(10) [], + varchar_array VARCHAR(10) [], + date_array DATE[], + numeric_array NUMERIC(10, 2) [], + varnumeric_array NUMERIC[3], + inet_array INET[], + cidr_array CIDR[], + macaddr_array MACADDR[], + tsrange_array TSRANGE[], + tstzrange_array TSTZRANGE[], + daterange_array DATERANGE[], + int4range_array INT4RANGE[], + numerange_array NUMRANGE[], + int8range_array INT8RANGE[], + uuid_array UUID[], + json_array json[], + jsonb_array jsonb[], + oid_array OID[], + PRIMARY KEY(pk) + )', schema_name); + + -- all check types(without fk and exclude): + EXECUTE format(' + CREATE TABLE %I.full_constraint_type ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + unique_col VARCHAR(255) UNIQUE, + not_null_col VARCHAR(255) NOT NULL, + check_col VARCHAR(255) CHECK (char_length(check_col) > 3) + )', schema_name); + + -- all index types: + EXECUTE format(' + CREATE TABLE %I.full_index_type ( + id SERIAL PRIMARY KEY, + unique_col VARCHAR(255) NOT NULL, + index_col VARCHAR(255), + fulltext_col TSVECTOR, + spatial_col POINT NOT NULL, + simple_index_col VARCHAR(255), + composite_index_col1 VARCHAR(255), + composite_index_col2 VARCHAR(255), + composite_index_col3 VARCHAR(255) + )', schema_name); + + EXECUTE format('CREATE UNIQUE INDEX unique_index ON %I.full_index_type (unique_col)', schema_name); + EXECUTE format('CREATE INDEX index_index ON %I.full_index_type (index_col)', schema_name); + EXECUTE format('CREATE INDEX fulltext_index ON %I.full_index_type USING gin(fulltext_col)', schema_name); + EXECUTE format('CREATE INDEX spatial_index ON %I.full_index_type USING gist(spatial_col)', schema_name); + EXECUTE format('CREATE INDEX simple_index ON %I.full_index_type (simple_index_col)', schema_name); + EXECUTE format('CREATE INDEX composite_index ON %I.full_index_type (composite_index_col1, composite_index_col2, composite_index_col3)', schema_name); + + -- table comments: + EXECUTE format('COMMENT ON TABLE %I.full_column_type IS %L', schema_name, 'Comment on full_column_type.'); + EXECUTE format('COMMENT ON TABLE %I.full_index_type IS %L', schema_name, 'Comment on full_index_type.'); + + -- column comments: + EXECUTE format('COMMENT ON COLUMN %I.full_column_type.id IS %L', schema_name, 'Comment on full_column_type.id.'); + EXECUTE format('COMMENT ON COLUMN %I.full_index_type.id IS %L', schema_name, 'Comment on full_index_type.id.'); + + -- sequences + + -- case 1: sequences created automatically when creating table + EXECUTE format('CREATE TABLE %I.sequence_test_1 (seq_1 SERIAL, seq_2 BIGSERIAL, seq_3 SMALLSERIAL)', schema_name); + + -- case 2: create independent sequences, then alter their owners + EXECUTE format('CREATE SEQUENCE %I.sequence_test_2_seq_1', schema_name); + EXECUTE format('CREATE SEQUENCE %I.sequence_test_2_seq_2', schema_name); + EXECUTE format('CREATE SEQUENCE %I.sequence_test_2_seq_3', schema_name); + EXECUTE format('CREATE TABLE %I.sequence_test_2 (seq_1 INTEGER, seq_2 BIGINT, seq_3 SMALLINT)', schema_name); + EXECUTE format('ALTER SEQUENCE %I.sequence_test_2_seq_1 OWNED BY %I.sequence_test_2.seq_1', schema_name, schema_name); + EXECUTE format('ALTER SEQUENCE %I.sequence_test_2_seq_2 OWNED BY %I.sequence_test_2.seq_2', schema_name, schema_name); + EXECUTE format('ALTER SEQUENCE %I.sequence_test_2_seq_3 OWNED BY %I.sequence_test_2.seq_3', schema_name, schema_name); + + -- case 3: create independent sequences, use them in column defaults without ownership + EXECUTE format('CREATE SEQUENCE %I.sequence_test_3_seq_2', schema_name); + EXECUTE format('CREATE SEQUENCE %I."sequence_test_3_seq.\d@_3"', schema_name); + EXECUTE format(' + CREATE TABLE %I.sequence_test_3 ( + seq_1 SERIAL, + seq_2 BIGINT DEFAULT nextval(%L), + seq_3 SMALLINT DEFAULT nextval(%L) + )', schema_name, schema_name || '.sequence_test_3_seq_2', schema_name || '."sequence_test_3_seq.\d@_3"'); + + -- case 4: create independent sequences and never used by any tables + EXECUTE format('CREATE SEQUENCE %I.sequence_test_4_seq_1', schema_name); + + -- test view filtered + EXECUTE format('CREATE VIEW %I.full_column_type_view AS SELECT * FROM %I.full_column_type', schema_name, schema_name); + + -- special character + EXECUTE format(' + CREATE TABLE %I."special_character_$1#@*_table" ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + unique_col VARCHAR(255) UNIQUE, + not_null_col VARCHAR(255) NOT NULL, + check_col VARCHAR(255) CHECK (char_length(check_col) > 3) + )', schema_name); + + END LOOP; +END $$; +``` \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/task_config.ini new file mode 100644 index 00000000..2a5fe98b --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/task_config.ini @@ -0,0 +1,39 @@ +[extractor] +extract_type=struct +db_type=pg +url={pg_extractor_url} +db_batch_size=10 + +[sinker] +sink_type=struct +db_type=pg +batch_size=1 +url={pg_sinker_url} +# conflict_policy=interrupt +conflict_policy=ignore + +[filter] +do_dbs=struct_it_pg2pg_* +ignore_dbs= +do_tbs= +ignore_tbs= +do_events= +# do_structures=database,table,constraint,sequence,comment,index + +[router] +db_map= +tb_map= +col_map= + +[runtime] +log_level=error +log4rs_file=./log4rs.yaml +log_dir=./logs + +[parallelizer] +parallel_type=serial +parallel_size=1 + +[pipeline] +checkpoint_interval_secs=1 +buffer_size=100 \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct_tests.rs b/dt-tests/tests/pg_to_pg/struct_tests.rs index 560b005f..9354d1bb 100644 --- a/dt-tests/tests/pg_to_pg/struct_tests.rs +++ b/dt-tests/tests/pg_to_pg/struct_tests.rs @@ -53,7 +53,17 @@ mod test { #[tokio::test] #[serial] - async fn struct_batch_test() { - TestBase::run_pg_struct_test("pg_to_pg/struct/batch_test").await; + async fn struct_batch_test_1() { + TestBase::run_pg_struct_test("pg_to_pg/struct/batch_test_1").await; + } + + #[tokio::test] + #[serial] + async fn struct_batch_test_2() { + let mut runner = RdbStructTestRunner::new("pg_to_pg/struct/batch_test_2/src_to_dst") + .await + .unwrap(); + runner.run_struct_test_without_check().await.unwrap(); + // TestBase::run_check_test("pg_to_pg/struct/batch_test_2/check").await; } } From 5c8c718e0f3b8c7cf3b82672f9dbe791035997b0 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Thu, 25 Sep 2025 17:10:40 +0800 Subject: [PATCH 11/17] mysql struct fetcher --- dt-common/src/config/task_config.rs | 4 +- .../extractor/mysql/mysql_struct_extractor.rs | 47 +++- .../src/extractor/pg/pg_struct_extractor.rs | 2 +- .../mysql/mysql_struct_fetcher.rs | 251 ++++++++++++------ .../src/meta_fetcher/pg/pg_struct_fetcher.rs | 10 +- .../src/sinker/mysql/mysql_checker.rs | 15 +- dt-task/src/extractor_util.rs | 3 +- .../{ => basic_test}/dst_prepare.sql | 0 .../{ => basic_test}/expect_ddl_8.0.sql | 0 .../{ => basic_test}/src_prepare.sql | 0 .../{ => basic_test}/task_config.ini | 9 +- .../bench_test_1/check/task_config.ini | 27 ++ .../bench_test_1/src_to_dst/dst_prepare.sql | 35 +++ .../bench_test_1/src_to_dst/src_prepare.sql | 60 +++++ .../bench_test_1/src_to_dst/task_config.ini | 28 ++ dt-tests/tests/mysql_to_mysql/struct_tests.rs | 17 +- .../basic_test}/dst_clean.sql | 0 .../basic_test}/dst_prepare.sql | 0 .../basic_test}/src_clean.sql | 0 .../basic_test}/src_prepare.sql | 0 .../basic_test}/task_config.ini | 0 .../bench_test_1}/check/task_config.ini | 2 +- .../bench_test_1}/src_to_dst/dst_clean.sql | 0 .../bench_test_1}/src_to_dst/dst_prepare.sql | 0 .../bench_test_1}/src_to_dst/src_clean.sql | 0 .../bench_test_1}/src_to_dst/src_prepare.sql | 0 .../bench_test_1}/src_to_dst/task_config.ini | 2 +- dt-tests/tests/pg_to_pg/struct_tests.rs | 15 +- 28 files changed, 411 insertions(+), 116 deletions(-) rename dt-tests/tests/mysql_to_mysql/struct/batch_test/{ => basic_test}/dst_prepare.sql (100%) rename dt-tests/tests/mysql_to_mysql/struct/batch_test/{ => basic_test}/expect_ddl_8.0.sql (100%) rename dt-tests/tests/mysql_to_mysql/struct/batch_test/{ => basic_test}/src_prepare.sql (100%) rename dt-tests/tests/mysql_to_mysql/struct/batch_test/{ => basic_test}/task_config.ini (76%) create mode 100644 dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/check/task_config.ini create mode 100644 dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql create mode 100644 dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql create mode 100644 dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/task_config.ini rename dt-tests/tests/pg_to_pg/struct/{batch_test_1 => batch_test/basic_test}/dst_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_1 => batch_test/basic_test}/dst_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_1 => batch_test/basic_test}/src_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_1 => batch_test/basic_test}/src_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_1 => batch_test/basic_test}/task_config.ini (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_2 => batch_test/bench_test_1}/check/task_config.ini (96%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_2 => batch_test/bench_test_1}/src_to_dst/dst_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_2 => batch_test/bench_test_1}/src_to_dst/dst_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_2 => batch_test/bench_test_1}/src_to_dst/src_clean.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_2 => batch_test/bench_test_1}/src_to_dst/src_prepare.sql (100%) rename dt-tests/tests/pg_to_pg/struct/{batch_test_2 => batch_test/bench_test_1}/src_to_dst/task_config.ini (97%) diff --git a/dt-common/src/config/task_config.rs b/dt-common/src/config/task_config.rs index 19f0a196..2f16dddf 100644 --- a/dt-common/src/config/task_config.rs +++ b/dt-common/src/config/task_config.rs @@ -177,7 +177,7 @@ impl TaskConfig { url, db: String::new(), dbs: Vec::new(), - db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1000), + db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1), }, ExtractType::FoxlakeS3 => { @@ -237,7 +237,7 @@ impl TaskConfig { schema: String::new(), schemas: Vec::new(), do_global_structs: false, - db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 100), + db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1), }, _ => bail! { not_supported_err }, diff --git a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs index 28d1399f..191f9d76 100644 --- a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs +++ b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs @@ -1,4 +1,8 @@ +use std::collections::HashSet; + +use anyhow::bail; use async_trait::async_trait; +use dt_common::error::Error; use dt_common::meta::struct_meta::struct_data::StructData; use dt_common::{log_info, rdb_filter::RdbFilter}; @@ -17,7 +21,6 @@ use crate::{ pub struct MysqlStructExtractor { pub base_extractor: BaseExtractor, pub conn_pool: Pool, - pub db: String, pub dbs: Vec, pub filter: RdbFilter, pub db_batch_size: usize, @@ -26,12 +29,16 @@ pub struct MysqlStructExtractor { #[async_trait] impl Extractor for MysqlStructExtractor { async fn extract(&mut self) -> anyhow::Result<()> { - log_info!( - "MysqlStructExtractor starts, schemas: {}", - self.dbs.join(",") - ); - for db in self.dbs.clone().iter() { - self.extract_internal(db).await?; + log_info!("MysqlStructExtractor starts..."); + let db_chunks: Vec> = self + .dbs + .chunks(self.db_batch_size) + .map(|chunk| chunk.to_vec()) + .collect(); + for db_chunk in db_chunks.into_iter() { + log_info!("MysqlStructExtractor extracts dbs: {}", db_chunk.join(",")); + self.extract_internal(db_chunk.into_iter().collect()) + .await?; } self.base_extractor.wait_task_finish().await } @@ -42,22 +49,24 @@ impl Extractor for MysqlStructExtractor { } impl MysqlStructExtractor { - pub async fn extract_internal(&mut self, db: &String) -> anyhow::Result<()> { + pub async fn extract_internal(&mut self, dbs: HashSet) -> anyhow::Result<()> { let meta_manager = MysqlMetaManager::new(self.conn_pool.clone()).await?; let mut fetcher = MysqlStructFetcher { conn_pool: self.conn_pool.to_owned(), - db: db.clone(), + dbs: dbs, filter: Some(self.filter.to_owned()), meta_manager, }; // database - let database_statement = fetcher.get_create_database_statement().await?; - self.push_dt_data(StructStatement::MysqlCreateDatabase(database_statement)) - .await?; + let database_statements = fetcher.get_create_database_statements("").await?; + for database_statement in database_statements { + self.push_dt_data(StructStatement::MysqlCreateDatabase(database_statement)) + .await?; + } // tables - for table_statement in fetcher.get_create_table_statements("").await? { + for table_statement in fetcher.get_create_table_statements("", "").await? { self.push_dt_data(StructStatement::MysqlCreateTable(table_statement)) .await?; } @@ -66,9 +75,19 @@ impl MysqlStructExtractor { pub async fn push_dt_data(&mut self, statement: StructStatement) -> anyhow::Result<()> { let struct_data = StructData { - schema: self.db.clone(), + schema: "".to_string(), statement, }; self.base_extractor.push_struct(struct_data).await } + + pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result<()> { + let max_db_batch_size = 1000; + let min_db_batch_size = 1; + if db_batch_size < min_db_batch_size || db_batch_size > max_db_batch_size { + bail! {Error::ConfigError(format!(r#"db_batch_size {} is not valid, should be in range ({}, {})"#, db_batch_size, min_db_batch_size, max_db_batch_size))} + } else { + Ok(()) + } + } } diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index aca1845a..809dd974 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -96,7 +96,7 @@ impl PgStructExtractor { } pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result<()> { - let max_db_batch_size = 100; + let max_db_batch_size = 1000; let min_db_batch_size = 1; if db_batch_size < min_db_batch_size || db_batch_size > max_db_batch_size { bail! {Error::ConfigError(format!(r#"db_batch_size {} is not valid, should be in range ({}, {})"#, db_batch_size, min_db_batch_size, max_db_batch_size))} diff --git a/dt-connector/src/meta_fetcher/mysql/mysql_struct_fetcher.rs b/dt-connector/src/meta_fetcher/mysql/mysql_struct_fetcher.rs index c260f609..e8326af9 100644 --- a/dt-connector/src/meta_fetcher/mysql/mysql_struct_fetcher.rs +++ b/dt-connector/src/meta_fetcher/mysql/mysql_struct_fetcher.rs @@ -26,38 +26,42 @@ use sqlx::{mysql::MySqlRow, MySql, Pool, Row}; pub struct MysqlStructFetcher { pub conn_pool: Pool, - pub db: String, + pub dbs: HashSet, pub filter: Option, pub meta_manager: MysqlMetaManager, } impl MysqlStructFetcher { - pub async fn get_create_database_statement( + pub async fn get_create_database_statements( &mut self, - ) -> anyhow::Result { - let database = self.get_database().await?; - Ok(MysqlCreateDatabaseStatement { database }) + db: &str, + ) -> anyhow::Result> { + let databases = self.get_databases(db).await?; + Ok(databases + .into_iter() + .map(|d| MysqlCreateDatabaseStatement { database: d }) + .collect()) } pub async fn get_create_table_statements( &mut self, + db: &str, tb: &str, ) -> anyhow::Result> { let mut results = Vec::new(); - let tables = self.get_tables(tb).await?; - let mut indexes = self.get_indexes(tb).await?; - let mut check_constraints = self.get_check_constraints(tb).await?; - let mut foreign_key_constraints = self.get_foreign_key_constraints(tb).await?; + let tables = self.get_tables(db, tb).await?; + let mut indexes = self.get_indexes(db, tb).await?; + let mut check_constraints = self.get_check_constraints(db, tb).await?; + let mut foreign_key_constraints = self.get_foreign_key_constraints(db, tb).await?; - for (table_name, table) in tables { - let mut constraints = self.get_result(&mut check_constraints, &table_name); - constraints - .extend_from_slice(&self.get_result(&mut foreign_key_constraints, &table_name)); + for (key, table) in tables { + let mut constraints = self.get_result(&mut check_constraints, &key); + constraints.extend_from_slice(&self.get_result(&mut foreign_key_constraints, &key)); let statement = MysqlCreateTableStatement { table, constraints, - indexes: self.get_result(&mut indexes, &table_name), + indexes: self.get_result(&mut indexes, &key), }; results.push(statement); } @@ -65,42 +69,85 @@ impl MysqlStructFetcher { } // Create Database: https://dev.mysql.com/doc/refman/8.0/en/create-database.html - async fn get_database(&mut self) -> anyhow::Result { + async fn get_databases(&mut self, db: &str) -> anyhow::Result> { + let (db_filter, target_dbs) = if !db.is_empty() { + if !self.dbs.contains(db) { + return Ok(Vec::new()); + } + ( + format!("SCHEMA_NAME = '{}'", db), + HashSet::from([db.to_string()]), + ) + } else if !self.dbs.is_empty() { + ( + format!("SCHEMA_NAME IN ({})", self.get_dbs_str()), + self.dbs.clone(), + ) + } else { + return Ok(Vec::new()); + }; + let sql = format!( "SELECT SCHEMA_NAME, DEFAULT_CHARACTER_SET_NAME, DEFAULT_COLLATION_NAME FROM information_schema.schemata - WHERE SCHEMA_NAME = '{}'", - self.db + WHERE {}", + db_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); - if let Some(row) = rows.try_next().await? { + let mut dbs = HashMap::new(); + while let Some(row) = rows.try_next().await? { let schema_name = Self::get_str_with_null(&row, "SCHEMA_NAME")?; let default_character_set_name = Self::get_str_with_null(&row, "DEFAULT_CHARACTER_SET_NAME")?; let default_collation_name = Self::get_str_with_null(&row, "DEFAULT_COLLATION_NAME")?; let database = Database { - name: schema_name, + name: schema_name.clone(), default_character_set_name, default_collation_name, }; - return Ok(database); + dbs.insert(schema_name.clone(), database); } - bail! {Error::StructError(format!("db: {} not found", self.db))} + let filtered_dbs: Vec = target_dbs + .iter() + .filter(|&s| !dbs.contains_key(s)) + .cloned() + .collect(); + if !filtered_dbs.is_empty() { + bail! {Error::StructError(format!( + "dbs: {} not found", + filtered_dbs.join(",") + ))} + } + + Ok(dbs.into_iter().map(|(_, v)| v).collect()) } - async fn get_tables(&mut self, tb: &str) -> anyhow::Result> { - let mut results: BTreeMap = BTreeMap::new(); + async fn get_tables( + &mut self, + db: &str, + tb: &str, + ) -> anyhow::Result> { + let mut results: BTreeMap<(String, String), Table> = BTreeMap::new(); // Create Table: https://dev.mysql.com/doc/refman/8.0/en/create-table.html - let tb_filter = if !tb.is_empty() { - format!("AND t.TABLE_NAME = '{}'", tb) + let tb_filter = if !db.is_empty() { + if !self.dbs.contains(db) { + return Ok(results); + } + if !tb.is_empty() { + format!("t.TABLE_SCHEMA = '{}' AND t.TABLE_NAME = '{}'", db, tb) + } else { + format!("t.TABLE_SCHEMA = '{}'", db) + } + } else if !self.dbs.is_empty() { + format!("t.TABLE_SCHEMA IN ({})", self.get_dbs_str()) } else { - String::new() + return Ok(results); }; // BASE TABLE for a table, VIEW for a view, or SYSTEM VIEW for an INFORMATION_SCHEMA table. @@ -124,10 +171,10 @@ impl MysqlStructFetcher { FROM information_schema.tables t LEFT JOIN information_schema.columns c ON t.TABLE_SCHEMA = c.TABLE_SCHEMA AND t.TABLE_NAME = c.TABLE_NAME - WHERE t.TABLE_SCHEMA ='{}' {} + WHERE {} AND t.TABLE_TYPE = 'BASE TABLE' ORDER BY t.TABLE_SCHEMA, t.TABLE_NAME, c.ORDINAL_POSITION", - self.db, tb_filter + tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -137,10 +184,8 @@ impl MysqlStructFetcher { Self::get_str_with_null(&row, "TABLE_NAME")?, ); - if let Some(filter) = &mut self.filter { - if filter.filter_tb(&db, &tb) { - continue; - } + if self.filter_tb(&db, &tb) { + continue; } let engine_name = Self::get_str_with_null(&row, "ENGINE")?; @@ -170,15 +215,16 @@ impl MysqlStructFetcher { generated: None, }; - if let Some(table) = results.get_mut(&tb) { + let key = (db.clone(), tb.clone()); + if let Some(table) = results.get_mut(&key) { table.columns.push(column); } else { let table_collation = Self::get_str_with_null(&row, "TABLE_COLLATION")?; let charset = Self::get_charset_by_collation(&table_collation); results.insert( - tb.clone(), + key, Table { - database_name: db.clone(), + database_name: db, schema_name: String::new(), table_name: tb, engine_name, @@ -250,14 +296,28 @@ impl MysqlStructFetcher { Ok(ColumnDefault::Literal(str)) } - async fn get_indexes(&mut self, tb: &str) -> anyhow::Result>> { - let mut index_map: HashMap<(String, String), Index> = HashMap::new(); + async fn get_indexes( + &mut self, + db: &str, + tb: &str, + ) -> anyhow::Result>> { + let mut results: HashMap<(String, String), Vec> = HashMap::new(); + let mut index_map: HashMap<(String, String, String), Index> = HashMap::new(); // Create Index: https://dev.mysql.com/doc/refman/8.0/en/create-index.html - let tb_filter = if !tb.is_empty() { - format!("AND TABLE_NAME = '{}'", tb) + let tb_filter = if !db.is_empty() { + if !self.dbs.contains(db) { + return Ok(results); + } + if !tb.is_empty() { + format!("TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}'", db, tb) + } else { + format!("TABLE_SCHEMA = '{}'", db) + } + } else if !self.dbs.is_empty() { + format!("TABLE_SCHEMA IN ({})", self.get_dbs_str()) } else { - String::new() + return Ok(results); }; let sql = format!( @@ -270,14 +330,15 @@ impl MysqlStructFetcher { INDEX_TYPE, COMMENT FROM information_schema.statistics - WHERE INDEX_NAME != '{}' AND TABLE_SCHEMA ='{}' {} + WHERE INDEX_NAME != '{}' AND {} ORDER BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX", - "PRIMARY", self.db, tb_filter + "PRIMARY", tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); while let Some(row) = rows.try_next().await? { - let (table_name, index_name) = ( + let (table_schema, table_name, index_name) = ( + Self::get_str_with_null(&row, "TABLE_SCHEMA")?, Self::get_str_with_null(&row, "TABLE_NAME")?, Self::get_str_with_null(&row, "INDEX_NAME")?, ); @@ -292,7 +353,7 @@ impl MysqlStructFetcher { seq_in_index, }; - let key = (table_name.clone(), index_name.clone()); + let key = (table_schema.clone(), table_name.clone(), index_name.clone()); if let Some(index) = index_map.get_mut(&key) { index.columns.push(column); } else { @@ -314,9 +375,8 @@ impl MysqlStructFetcher { } } - let mut results: HashMap> = HashMap::new(); - for ((tb, _index_name), index) in index_map { - self.push_to_results(&mut results, &tb, index); + for ((db, tb, _index_name), index) in index_map { + self.push_to_results(&mut results, &db, &tb, index); } Ok(results) @@ -324,9 +384,10 @@ impl MysqlStructFetcher { async fn get_check_constraints( &mut self, + db: &str, tb: &str, - ) -> anyhow::Result>> { - let mut results: HashMap> = HashMap::new(); + ) -> anyhow::Result>> { + let mut results: HashMap<(String, String), Vec> = HashMap::new(); // information_schema.check_constraints was introduced from MySQL 8.0.16 // also, many MySQL-like databases: PolarDB/PolarX doesn't have check_constraints let info_tbs = self.get_information_schema_tables().await?; @@ -335,10 +396,22 @@ impl MysqlStructFetcher { } // Check Constraint: https://dev.mysql.com/doc/refman/8.0/en/create-table-check-constraints.html - let tb_filter = if !tb.is_empty() { - format!("AND tc.TABLE_NAME = '{}'", tb) + let tb_filter = if !db.is_empty() { + if !self.dbs.contains(db) { + return Ok(results); + } + if !tb.is_empty() { + format!( + "tc.CONSTRAINT_SCHEMA = '{}' AND tc.TABLE_NAME = '{}'", + db, tb + ) + } else { + format!("tc.CONSTRAINT_SCHEMA = '{}'", db) + } + } else if !self.dbs.is_empty() { + format!("tc.CONSTRAINT_SCHEMA IN ({})", self.get_dbs_str()) } else { - String::new() + return Ok(results); }; let constraint_type_str = ConstraintType::Check.to_str(DbType::Mysql); @@ -352,9 +425,9 @@ impl MysqlStructFetcher { FROM information_schema.table_constraints tc LEFT JOIN information_schema.check_constraints cc ON tc.CONSTRAINT_SCHEMA = cc.CONSTRAINT_SCHEMA AND tc.CONSTRAINT_NAME = cc.CONSTRAINT_NAME - WHERE tc.CONSTRAINT_SCHEMA = '{}' {} + WHERE {} AND tc.CONSTRAINT_TYPE='{}' ", - self.db, tb_filter, constraint_type_str + tb_filter, constraint_type_str ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -365,14 +438,14 @@ impl MysqlStructFetcher { let check_clause = Self::get_str_with_null(&row, "CHECK_CLAUSE")?; let definition = self.unescape(check_clause).await?; let constraint = Constraint { - database_name, + database_name: database_name.clone(), schema_name: String::new(), table_name: table_name.clone(), constraint_name, constraint_type: ConstraintType::Check, definition, }; - self.push_to_results(&mut results, &table_name, constraint); + self.push_to_results(&mut results, &database_name, &table_name, constraint); } Ok(results) @@ -380,15 +453,28 @@ impl MysqlStructFetcher { async fn get_foreign_key_constraints( &mut self, + db: &str, tb: &str, - ) -> anyhow::Result>> { - let mut results: HashMap> = HashMap::new(); + ) -> anyhow::Result>> { + let mut results: HashMap<(String, String), Vec> = HashMap::new(); // Check Constraint: https://dev.mysql.com/doc/refman/8.0/en/create-table-check-constraints.html - let tb_filter = if !tb.is_empty() { - format!("AND kcu.TABLE_NAME = '{}'", tb) + let tb_filter = if !db.is_empty() { + if !self.dbs.contains(db) { + return Ok(results); + } + if !tb.is_empty() { + format!( + "kcu.CONSTRAINT_SCHEMA = '{}' AND kcu.TABLE_NAME = '{}'", + db, tb + ) + } else { + format!("kcu.CONSTRAINT_SCHEMA = '{}'", db) + } + } else if !self.dbs.is_empty() { + format!("kcu.CONSTRAINT_SCHEMA IN ({})", self.get_dbs_str()) } else { - String::new() + return Ok(results); }; let constraint_type_str = ConstraintType::Foreign.to_str(DbType::Mysql); @@ -406,10 +492,10 @@ impl MysqlStructFetcher { JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME AND kcu.CONSTRAINT_SCHEMA=tc.CONSTRAINT_SCHEMA WHERE - kcu.CONSTRAINT_SCHEMA = '{}' - AND kcu.REFERENCED_TABLE_SCHEMA = '{}' {} + {} + AND kcu.REFERENCED_TABLE_SCHEMA = kcu.CONSTRAINT_SCHEMA AND tc.CONSTRAINT_TYPE = '{}'", - self.db, self.db, tb_filter, constraint_type_str, + tb_filter, constraint_type_str, ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); @@ -425,14 +511,14 @@ impl MysqlStructFetcher { column_name, database_name, referenced_table_name, referenced_column_name ); let constraint = Constraint { - database_name, + database_name: database_name.clone(), schema_name: String::new(), table_name: table_name.clone(), constraint_name, constraint_type: ConstraintType::Foreign, definition, }; - self.push_to_results(&mut results, &table_name, constraint); + self.push_to_results(&mut results, &database_name, &table_name, constraint); } Ok(results) } @@ -455,9 +541,12 @@ impl MysqlStructFetcher { Ok(String::new()) } - fn filter_tb(&mut self, tb: &str) -> bool { + fn filter_tb(&mut self, db: &str, tb: &str) -> bool { + if !self.dbs.contains(db) { + return true; + } if let Some(filter) = &mut self.filter { - return filter.filter_tb(&self.db, tb); + return filter.filter_tb(db, tb); } false } @@ -509,22 +598,36 @@ impl MysqlStructFetcher { fn push_to_results( &mut self, - results: &mut HashMap>, + results: &mut HashMap<(String, String), Vec>, + table_schema: &str, table_name: &str, item: T, ) { - if self.filter_tb(table_name) { + if self.filter_tb(table_schema, table_name) { return; } - if let Some(exists) = results.get_mut(table_name) { + let key = (table_schema.into(), table_name.into()); + if let Some(exists) = results.get_mut(&key) { exists.push(item); } else { - results.insert(table_name.into(), vec![item]); + results.insert(key, vec![item]); } } - fn get_result(&self, results: &mut HashMap>, table_name: &str) -> Vec { - results.remove(table_name).unwrap_or_default() + fn get_result( + &self, + results: &mut HashMap<(String, String), Vec>, + key: &(String, String), + ) -> Vec { + results.remove(key).unwrap_or_default() + } + + fn get_dbs_str(&self) -> String { + self.dbs + .iter() + .map(|s| format!("'{}'", s)) + .collect::>() + .join(",") } } diff --git a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs index 07330975..c860327d 100644 --- a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs +++ b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs @@ -468,11 +468,12 @@ impl PgStructFetcher { ..Default::default() }; - if let Some(table) = results.get_mut(&(table_schema.clone(), table_name.clone())) { + let key = (table_schema.clone(), table_name.clone()); + if let Some(table) = results.get_mut(&key) { table.columns.push(column); } else { results.insert( - (table_schema.clone(), table_name.clone()), + key, Table { database_name: table_schema.clone(), schema_name: table_schema, @@ -1217,10 +1218,11 @@ impl PgStructFetcher { return; } - if let Some(exists) = results.get_mut(&(schema_name.to_owned(), table_name.to_owned())) { + let key = (schema_name.into(), table_name.into()); + if let Some(exists) = results.get_mut(&key) { exists.push(item); } else { - results.insert((schema_name.into(), table_name.into()), vec![item]); + results.insert(key, vec![item]); } } diff --git a/dt-connector/src/sinker/mysql/mysql_checker.rs b/dt-connector/src/sinker/mysql/mysql_checker.rs index 5246a756..0c5371a3 100644 --- a/dt-connector/src/sinker/mysql/mysql_checker.rs +++ b/dt-connector/src/sinker/mysql/mysql_checker.rs @@ -1,4 +1,8 @@ -use std::{cmp, collections::HashMap, sync::Arc}; +use std::{ + cmp, + collections::{HashMap, HashSet}, + sync::Arc, +}; use async_trait::async_trait; use futures::TryStreamExt; @@ -168,20 +172,21 @@ impl MysqlChecker { let mut struct_fetcher = MysqlStructFetcher { conn_pool: self.conn_pool.to_owned(), - db, + dbs: HashSet::from([db.clone()]), filter: None, meta_manager: self.meta_manager.clone(), }; let mut dst_statement = match &src_statement { StructStatement::MysqlCreateDatabase(_) => { - let dst_statement = struct_fetcher.get_create_database_statement().await?; - StructStatement::MysqlCreateDatabase(dst_statement) + let mut dst_statement = + struct_fetcher.get_create_database_statements(&db).await?; + StructStatement::MysqlCreateDatabase(dst_statement.remove(0)) } StructStatement::MysqlCreateTable(s) => { let mut dst_statement = struct_fetcher - .get_create_table_statements(&s.table.table_name) + .get_create_table_statements(&s.table.database_name, &s.table.table_name) .await?; if dst_statement.is_empty() { StructStatement::Unknown diff --git a/dt-task/src/extractor_util.rs b/dt-task/src/extractor_util.rs index 25146c6b..20b63301 100644 --- a/dt-task/src/extractor_util.rs +++ b/dt-task/src/extractor_util.rs @@ -334,17 +334,16 @@ impl ExtractorUtil { ExtractorConfig::MysqlStruct { url, - db, dbs, db_batch_size, .. } => { + MysqlStructExtractor::validate_db_batch_size(db_batch_size)?; // TODO, pass max_connections as parameter let conn_pool = TaskUtil::create_mysql_conn_pool(&url, 2, enable_sqlx_log, false).await?; let extractor = MysqlStructExtractor { conn_pool, - db, dbs, filter, base_extractor, diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/basic_test/dst_prepare.sql similarity index 100% rename from dt-tests/tests/mysql_to_mysql/struct/batch_test/dst_prepare.sql rename to dt-tests/tests/mysql_to_mysql/struct/batch_test/basic_test/dst_prepare.sql diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/expect_ddl_8.0.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/basic_test/expect_ddl_8.0.sql similarity index 100% rename from dt-tests/tests/mysql_to_mysql/struct/batch_test/expect_ddl_8.0.sql rename to dt-tests/tests/mysql_to_mysql/struct/batch_test/basic_test/expect_ddl_8.0.sql diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/basic_test/src_prepare.sql similarity index 100% rename from dt-tests/tests/mysql_to_mysql/struct/batch_test/src_prepare.sql rename to dt-tests/tests/mysql_to_mysql/struct/batch_test/basic_test/src_prepare.sql diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/task_config.ini b/dt-tests/tests/mysql_to_mysql/struct/batch_test/basic_test/task_config.ini similarity index 76% rename from dt-tests/tests/mysql_to_mysql/struct/batch_test/task_config.ini rename to dt-tests/tests/mysql_to_mysql/struct/batch_test/basic_test/task_config.ini index f29a983f..806151a1 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/batch_test/task_config.ini +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/basic_test/task_config.ini @@ -2,7 +2,7 @@ extract_type=struct db_type=mysql url={mysql_extractor_url_8_0} -batch_size=2 +db_batch_size=3 [sinker] sink_type=struct @@ -18,6 +18,11 @@ do_dbs=struct_it_mysql2mysql_* parallel_type=serial parallel_size=1 +[runtime] +log_level=info +log4rs_file=./log4rs.yaml +log_dir=./logs + [pipeline] checkpoint_interval_secs=1 -buffer_size=100 \ No newline at end of file +buffer_size=1000 \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/check/task_config.ini b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/check/task_config.ini new file mode 100644 index 00000000..a74f02c6 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/check/task_config.ini @@ -0,0 +1,27 @@ +[extractor] +extract_type=struct +db_type=mysql +url={mysql_extractor_url_8_0} +db_batch_size=1 + +[sinker] +sink_type=check +db_type=mysql +batch_size=1 +url={mysql_sinker_url_8_0} + +[filter] +do_dbs=struct_it_mysql2mysql_* + +[parallelizer] +parallel_type=serial +parallel_size=1 + +[runtime] +log_level=error +log4rs_file=./log4rs.yaml +log_dir=./logs + +[pipeline] +checkpoint_interval_secs=1 +buffer_size=1000 \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql new file mode 100644 index 00000000..10026696 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql @@ -0,0 +1,35 @@ +``` +create database if not exists struct_it_mysql2mysql_0 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; +use struct_it_mysql2mysql_0; + +DROP PROCEDURE IF EXISTS SetupTestDatabases; +CREATE PROCEDURE SetupTestDatabases() +BEGIN + -- Declare variables for the loop counter and database name. + DECLARE i INT DEFAULT 1; + DECLARE db_name VARCHAR(255); + + WHILE i <= 10 DO + -- Construct the database name, e.g., 'struct_it_mysql2mysql_1' + SET db_name = CONCAT('struct_it_mysql2mysql_', i); + + -- ==================================================================== + -- Use PREPARE and EXECUTE to run dynamic SQL statements. + -- This is the standard way to execute dynamic DDL (like CREATE, DROP) + -- in a stored procedure. + -- ==================================================================== + + -- 1. Dynamically Drop Database + -- Build the SQL command into a string. + SET @sql_command = CONCAT('DROP DATABASE IF EXISTS `', db_name, '`'); + -- Prepare the statement. + PREPARE stmt FROM @sql_command; + -- Execute the statement. + EXECUTE stmt; + -- Deallocate the prepared statement. + DEALLOCATE PREPARE stmt; + -- Increment the loop counter. + SET i = i + 1; + END WHILE; +END +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql new file mode 100644 index 00000000..f4721102 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql @@ -0,0 +1,60 @@ +``` + +create database if not exists struct_it_mysql2mysql_0 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; +use struct_it_mysql2mysql_0; + +DROP PROCEDURE IF EXISTS SetupTestDatabases; +CREATE PROCEDURE SetupTestDatabases() +BEGIN + -- Declare variables for the loop counter and database name. + DECLARE i INT DEFAULT 1; + DECLARE db_name VARCHAR(255); + + WHILE i <= 10 DO + -- Construct the database name, e.g., 'struct_it_mysql2mysql_1' + SET db_name = CONCAT('struct_it_mysql2mysql_', i); + + -- ==================================================================== + -- Use PREPARE and EXECUTE to run dynamic SQL statements. + -- This is the standard way to execute dynamic DDL (like CREATE, DROP) + -- in a stored procedure. + -- ==================================================================== + + -- 1. Dynamically Drop Database + -- Build the SQL command into a string. + SET @sql_command = CONCAT('DROP DATABASE IF EXISTS `', db_name, '`'); + -- Prepare the statement. + PREPARE stmt FROM @sql_command; + -- Execute the statement. + EXECUTE stmt; + -- Deallocate the prepared statement. + DEALLOCATE PREPARE stmt; + + -- 2. Dynamically Create Database + SET @sql_command = CONCAT('CREATE DATABASE IF NOT EXISTS `', db_name, '` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci'); + PREPARE stmt FROM @sql_command; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + + -- 3. Dynamically Create Table + -- Note: Single quotes inside the string literal are escaped by doubling them (''). + SET @sql_command = CONCAT(' + CREATE TABLE `', db_name, '`.`expression_defaults` ( + `i` INT DEFAULT 0, + `c` VARCHAR(10) DEFAULT \'\', + `f` FLOAT DEFAULT (RAND() * RAND()), + `b` BINARY(16) DEFAULT (UUID_TO_BIN(UUID())), + `d` DATE DEFAULT (CURRENT_DATE + INTERVAL 1 YEAR), + `p` POINT DEFAULT (Point(0,0)), + `j` JSON DEFAULT (JSON_ARRAY()) + ) + '); + PREPARE stmt FROM @sql_command; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + + -- Increment the loop counter. + SET i = i + 1; + END WHILE; +END +``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/task_config.ini b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/task_config.ini new file mode 100644 index 00000000..84ad9623 --- /dev/null +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/task_config.ini @@ -0,0 +1,28 @@ +[extractor] +extract_type=struct +db_type=mysql +url={mysql_extractor_url_8_0} +db_batch_size=10 + +[sinker] +sink_type=struct +db_type=mysql +batch_size=1 +url={mysql_sinker_url_8_0} +conflict_policy=interrupt + +[filter] +do_dbs=struct_it_mysql2mysql_* + +[parallelizer] +parallel_type=serial +parallel_size=1 + +[runtime] +log_level=error +log4rs_file=./log4rs.yaml +log_dir=./logs + +[pipeline] +checkpoint_interval_secs=1 +buffer_size=1000 \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct_tests.rs b/dt-tests/tests/mysql_to_mysql/struct_tests.rs index a1578bcf..f6fb5cc0 100644 --- a/dt-tests/tests/mysql_to_mysql/struct_tests.rs +++ b/dt-tests/tests/mysql_to_mysql/struct_tests.rs @@ -3,7 +3,7 @@ mod test { use serial_test::serial; - use crate::test_runner::test_base::TestBase; + use crate::test_runner::{rdb_struct_test_runner::RdbStructTestRunner, test_base::TestBase}; #[tokio::test] #[serial] @@ -51,7 +51,18 @@ mod test { #[tokio::test] #[serial] - async fn struct_batch_test() { - TestBase::run_mysql_struct_test("mysql_to_mysql/struct/batch_test").await; + async fn struct_batch_basic_test() { + TestBase::run_mysql_struct_test("mysql_to_mysql/struct/batch_test/basic_test").await; + } + + #[tokio::test] + #[serial] + async fn struct_batch_bench_test_1() { + let mut runner = + RdbStructTestRunner::new("mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst") + .await + .unwrap(); + runner.run_struct_test_without_check().await.unwrap(); + // TestBase::run_check_test("mysql_to_mysql/struct/batch_test/bench_test_1/check").await; } } diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_1/dst_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/dst_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test_1/dst_clean.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/dst_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_1/dst_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/dst_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test_1/dst_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/dst_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_1/src_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/src_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test_1/src_clean.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/src_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_1/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/src_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test_1/src_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/src_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_1/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/task_config.ini similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test_1/task_config.ini rename to dt-tests/tests/pg_to_pg/struct/batch_test/basic_test/task_config.ini diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/check/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/check/task_config.ini similarity index 96% rename from dt-tests/tests/pg_to_pg/struct/batch_test_2/check/task_config.ini rename to dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/check/task_config.ini index 220a2620..0ea5ad41 100644 --- a/dt-tests/tests/pg_to_pg/struct/batch_test_2/check/task_config.ini +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/check/task_config.ini @@ -2,7 +2,7 @@ db_type=pg extract_type=struct url={pg_extractor_url} -db_batch_size=10 +db_batch_size=1 [sinker] db_type=pg diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/dst_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_clean.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/dst_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/dst_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/src_clean.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_clean.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/src_clean.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql similarity index 100% rename from dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/src_prepare.sql rename to dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/task_config.ini similarity index 97% rename from dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/task_config.ini rename to dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/task_config.ini index 2a5fe98b..ec4e3329 100644 --- a/dt-tests/tests/pg_to_pg/struct/batch_test_2/src_to_dst/task_config.ini +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_1/src_to_dst/task_config.ini @@ -36,4 +36,4 @@ parallel_size=1 [pipeline] checkpoint_interval_secs=1 -buffer_size=100 \ No newline at end of file +buffer_size=1000 \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct_tests.rs b/dt-tests/tests/pg_to_pg/struct_tests.rs index 9354d1bb..228441e3 100644 --- a/dt-tests/tests/pg_to_pg/struct_tests.rs +++ b/dt-tests/tests/pg_to_pg/struct_tests.rs @@ -53,17 +53,18 @@ mod test { #[tokio::test] #[serial] - async fn struct_batch_test_1() { - TestBase::run_pg_struct_test("pg_to_pg/struct/batch_test_1").await; + async fn struct_batch_basic_test() { + TestBase::run_pg_struct_test("pg_to_pg/struct/batch_test/basic_test").await; } #[tokio::test] #[serial] - async fn struct_batch_test_2() { - let mut runner = RdbStructTestRunner::new("pg_to_pg/struct/batch_test_2/src_to_dst") - .await - .unwrap(); + async fn struct_batch_bench_test_1() { + let mut runner = + RdbStructTestRunner::new("pg_to_pg/struct/batch_test/bench_test_1/src_to_dst") + .await + .unwrap(); runner.run_struct_test_without_check().await.unwrap(); - // TestBase::run_check_test("pg_to_pg/struct/batch_test_2/check").await; + // TestBase::run_check_test("pg_to_pg/struct/batch_test/bench_test_1/check").await; } } From 9dadd4eff02a841fb557de251672f472d983fb89 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Thu, 25 Sep 2025 17:31:00 +0800 Subject: [PATCH 12/17] fix test --- .../struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql | 4 +++- .../struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql index 10026696..88bb63df 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql @@ -31,5 +31,7 @@ BEGIN -- Increment the loop counter. SET i = i + 1; END WHILE; -END +END; + +CALL SetupTestDatabases(); ``` \ No newline at end of file diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql index f4721102..ff378986 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql @@ -56,5 +56,7 @@ BEGIN -- Increment the loop counter. SET i = i + 1; END WHILE; -END +END; + +CALL SetupTestDatabases(); ``` \ No newline at end of file From e268cd26bab6df581ea0962ce08f86a5483dca47 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Thu, 25 Sep 2025 17:46:26 +0800 Subject: [PATCH 13/17] fmt --- dt-task/src/task_runner.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index 266b819f..baaa107f 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -897,7 +897,7 @@ impl TaskRunner { self.task_monitor .add_no_window_metrics(TaskMetricsType::TotalProgressCount, tbs.len() as u64); let mut finished_tbs = 0; - + for tb in tbs.iter() { if snapshot_resumer.check_finished(schema, tb) { log_info!("schema: {}, tb: {}, already finished", schema, tb); @@ -971,9 +971,11 @@ impl TaskRunner { id: format!("{}.{}", schema, tb), }); } - - self.task_monitor - .add_no_window_metrics(TaskMetricsType::FinishedProgressCount, finished_tbs as u64); + + self.task_monitor.add_no_window_metrics( + TaskMetricsType::FinishedProgressCount, + finished_tbs as u64, + ); } } Ok(pending_tasks) From a57decbdf34cbb6dd15afe90b6bdbd5b86780728 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Thu, 25 Sep 2025 19:49:04 +0800 Subject: [PATCH 14/17] test --- .../bench_test_1/src_to_dst/dst_prepare.sql | 2 +- .../bench_test_1/src_to_dst/src_prepare.sql | 2 +- .../bench_test_1/src_to_dst/task_config.ini | 2 +- .../bench_test_2/check/task_config.ini | 36 +++++++++++++ .../bench_test_2/src_to_dst/dst_clean.sql | 1 + .../bench_test_2/src_to_dst/dst_prepare.sql | 15 ++++++ .../bench_test_2/src_to_dst/src_clean.sql | 1 + .../bench_test_2/src_to_dst/src_prepare.sql | 51 +++++++++++++++++++ .../bench_test_2/src_to_dst/task_config.ini | 39 ++++++++++++++ dt-tests/tests/pg_to_pg/struct_tests.rs | 11 ++++ 10 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/check/task_config.ini create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/dst_clean.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/dst_prepare.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/src_clean.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/src_prepare.sql create mode 100644 dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/task_config.ini diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql index 88bb63df..c584b7c8 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/dst_prepare.sql @@ -9,7 +9,7 @@ BEGIN DECLARE i INT DEFAULT 1; DECLARE db_name VARCHAR(255); - WHILE i <= 10 DO + WHILE i <= 100 DO -- Construct the database name, e.g., 'struct_it_mysql2mysql_1' SET db_name = CONCAT('struct_it_mysql2mysql_', i); diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql index ff378986..a5304654 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/src_prepare.sql @@ -10,7 +10,7 @@ BEGIN DECLARE i INT DEFAULT 1; DECLARE db_name VARCHAR(255); - WHILE i <= 10 DO + WHILE i <= 100 DO -- Construct the database name, e.g., 'struct_it_mysql2mysql_1' SET db_name = CONCAT('struct_it_mysql2mysql_', i); diff --git a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/task_config.ini b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/task_config.ini index 84ad9623..fc920c39 100644 --- a/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/task_config.ini +++ b/dt-tests/tests/mysql_to_mysql/struct/batch_test/bench_test_1/src_to_dst/task_config.ini @@ -2,7 +2,7 @@ extract_type=struct db_type=mysql url={mysql_extractor_url_8_0} -db_batch_size=10 +db_batch_size=100 [sinker] sink_type=struct diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/check/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/check/task_config.ini new file mode 100644 index 00000000..0ea5ad41 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/check/task_config.ini @@ -0,0 +1,36 @@ +[extractor] +db_type=pg +extract_type=struct +url={pg_extractor_url} +db_batch_size=1 + +[sinker] +db_type=pg +sink_type=check +url={pg_sinker_url} +batch_size=2 + +[filter] +do_dbs=struct_it_pg2pg_* +ignore_dbs= +do_tbs= +ignore_tbs= +do_events= + +[router] +db_map= +tb_map= +col_map= + +[parallelizer] +parallel_type=rdb_check +parallel_size=2 + +[pipeline] +buffer_size=4 +checkpoint_interval_secs=1 + +[runtime] +log_level=error +log4rs_file=./log4rs.yaml +log_dir=./logs \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/dst_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/dst_clean.sql new file mode 100644 index 00000000..e88cc948 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/dst_clean.sql @@ -0,0 +1 @@ +-- drop schema if exists struct_it_pg2pg_1 CASCADE; \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/dst_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/dst_prepare.sql new file mode 100644 index 00000000..ee29764c --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/dst_prepare.sql @@ -0,0 +1,15 @@ +-- Use a DO block and a FOR loop to create the schemas and their objects dynamically. +``` +DO $$ +DECLARE + i INT; + schema_name TEXT; +BEGIN + FOR i IN 1..100 LOOP + schema_name := 'struct_it_pg2pg_' || i; + + -- Drop Schemas + EXECUTE format('DROP SCHEMA IF EXISTS %I CASCADE', schema_name); + END LOOP; +END $$; +``` diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/src_clean.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/src_clean.sql new file mode 100644 index 00000000..e88cc948 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/src_clean.sql @@ -0,0 +1 @@ +-- drop schema if exists struct_it_pg2pg_1 CASCADE; \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/src_prepare.sql b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/src_prepare.sql new file mode 100644 index 00000000..9c3ce5c2 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/src_prepare.sql @@ -0,0 +1,51 @@ +-- Use a DO block and a FOR loop to create the schemas and their objects dynamically. +``` +DO $$ +DECLARE + i INT; + schema_name TEXT; +BEGIN + + FOR i IN 1..100 LOOP + schema_name := 'struct_it_pg2pg_' || i; + + -- Drop and Create the Schema + EXECUTE format('DROP SCHEMA IF EXISTS %I CASCADE', schema_name); + EXECUTE format('CREATE SCHEMA %I', schema_name); + + -- all basic column types: + EXECUTE format(' + CREATE TABLE %I.full_column_type ( + id SERIAL PRIMARY KEY, + varchar_col VARCHAR(255) NOT NULL, + char_col CHAR(10), + text_col TEXT, + boolean_col BOOLEAN, + smallint_col SMALLINT, + integer_col INTEGER, + bigint_col BIGINT, + decimal_col DECIMAL(10, 2), + numeric_col NUMERIC(10, 2), + real_col REAL, + double_precision_col DOUBLE PRECISION, + date_col DATE, + time_col TIME, + timestamp_col TIMESTAMP, + interval_col INTERVAL, + bytea_col BYTEA, + uuid_col UUID, + xml_col XML, + json_col JSON, + jsonb_col JSONB, + point_col POINT, + line_col LINE, + lseg_col LSEG, + box_col BOX, + path_col PATH, + polygon_col POLYGON, + circle_col CIRCLE + )', schema_name); + + END LOOP; +END $$; +``` \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/task_config.ini new file mode 100644 index 00000000..ec4e3329 --- /dev/null +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/task_config.ini @@ -0,0 +1,39 @@ +[extractor] +extract_type=struct +db_type=pg +url={pg_extractor_url} +db_batch_size=10 + +[sinker] +sink_type=struct +db_type=pg +batch_size=1 +url={pg_sinker_url} +# conflict_policy=interrupt +conflict_policy=ignore + +[filter] +do_dbs=struct_it_pg2pg_* +ignore_dbs= +do_tbs= +ignore_tbs= +do_events= +# do_structures=database,table,constraint,sequence,comment,index + +[router] +db_map= +tb_map= +col_map= + +[runtime] +log_level=error +log4rs_file=./log4rs.yaml +log_dir=./logs + +[parallelizer] +parallel_type=serial +parallel_size=1 + +[pipeline] +checkpoint_interval_secs=1 +buffer_size=1000 \ No newline at end of file diff --git a/dt-tests/tests/pg_to_pg/struct_tests.rs b/dt-tests/tests/pg_to_pg/struct_tests.rs index 228441e3..e4a413b3 100644 --- a/dt-tests/tests/pg_to_pg/struct_tests.rs +++ b/dt-tests/tests/pg_to_pg/struct_tests.rs @@ -67,4 +67,15 @@ mod test { runner.run_struct_test_without_check().await.unwrap(); // TestBase::run_check_test("pg_to_pg/struct/batch_test/bench_test_1/check").await; } + + #[tokio::test] + #[serial] + async fn struct_batch_bench_test_2() { + let mut runner = + RdbStructTestRunner::new("pg_to_pg/struct/batch_test/bench_test_2/src_to_dst") + .await + .unwrap(); + runner.run_struct_test_without_check().await.unwrap(); + // TestBase::run_check_test("pg_to_pg/struct/batch_test/bench_test_2/check").await; + } } From cdf70c1b3319245746635c4ab8d0d324f442b896 Mon Sep 17 00:00:00 2001 From: gnolong <2391353625@qq.com> Date: Fri, 26 Sep 2025 15:46:55 +0800 Subject: [PATCH 15/17] fix pg sequence --- dt-common/src/config/task_config.rs | 4 ++-- .../src/extractor/pg/pg_struct_extractor.rs | 8 +++++-- .../src/meta_fetcher/pg/pg_struct_fetcher.rs | 21 ++++++------------- .../bench_test_2/check/task_config.ini | 1 + .../bench_test_2/src_to_dst/task_config.ini | 6 +++--- 5 files changed, 18 insertions(+), 22 deletions(-) diff --git a/dt-common/src/config/task_config.rs b/dt-common/src/config/task_config.rs index 2f16dddf..2aa7c944 100644 --- a/dt-common/src/config/task_config.rs +++ b/dt-common/src/config/task_config.rs @@ -177,7 +177,7 @@ impl TaskConfig { url, db: String::new(), dbs: Vec::new(), - db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1), + db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1000), }, ExtractType::FoxlakeS3 => { @@ -237,7 +237,7 @@ impl TaskConfig { schema: String::new(), schemas: Vec::new(), do_global_structs: false, - db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1), + db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1000), }, _ => bail! { not_supported_err }, diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index 809dd974..932f324e 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -35,13 +35,17 @@ impl Extractor for PgStructExtractor { .chunks(self.db_batch_size) .map(|chunk| chunk.to_vec()) .collect(); + let do_global_structs = schema_chunks.len() - 1; for (flag, schema_chunk) in schema_chunks.into_iter().enumerate() { log_info!( "PgStructExtractor extracts schemas: {}", schema_chunk.join(",") ); - self.extract_internal(schema_chunk.into_iter().collect(), flag == 0) - .await?; + self.extract_internal( + schema_chunk.into_iter().collect(), + flag == do_global_structs, + ) + .await?; } self.base_extractor.wait_task_finish().await } diff --git a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs index aafdfd36..47385f0e 100644 --- a/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs +++ b/dt-connector/src/meta_fetcher/pg/pg_struct_fetcher.rs @@ -151,27 +151,18 @@ impl PgStructFetcher { ) -> anyhow::Result>> { let mut results = HashMap::new(); - let (sch_filter, tb_filter) = if !sch.is_empty() { + let tb_filter = if !sch.is_empty() { if !self.schemas.contains(sch) { return Ok(results); } if !tb.is_empty() { - ( - format!("ns.nspname = '{}'", sch), - format!("obj.sequence_schema='{}' AND tab.relname = '{}'", sch, tb), - ) + format!("obj.sequence_schema='{}' AND tab.relname = '{}'", sch, tb) } else { - ( - format!("ns.nspname = '{}'", sch), - format!("obj.sequence_schema = '{}'", sch), - ) + format!("obj.sequence_schema = '{}'", sch) } } else if !self.schemas.is_empty() { let schemas_str = &self.get_schemas_str(); - ( - format!("ns.nspname IN ({})", schemas_str), - format!("obj.sequence_schema IN ({})", schemas_str), - ) + format!("obj.sequence_schema IN ({})", schemas_str) } else { return Ok(results); }; @@ -197,9 +188,9 @@ impl PgStructFetcher { JOIN pg_class AS tab ON (dep.refobjid = tab.oid) WHERE {} - AND {} + AND ns.nspname = obj.sequence_schema AND dep.deptype='a'", - sch_filter, tb_filter + tb_filter ); let mut rows = sqlx::query(&sql).fetch(&self.conn_pool); diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/check/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/check/task_config.ini index 0ea5ad41..ee593328 100644 --- a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/check/task_config.ini +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/check/task_config.ini @@ -16,6 +16,7 @@ ignore_dbs= do_tbs= ignore_tbs= do_events= +do_structures=database,table,constraint,sequence,comment,index [router] db_map= diff --git a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/task_config.ini b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/task_config.ini index ec4e3329..ebf83218 100644 --- a/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/task_config.ini +++ b/dt-tests/tests/pg_to_pg/struct/batch_test/bench_test_2/src_to_dst/task_config.ini @@ -2,14 +2,14 @@ extract_type=struct db_type=pg url={pg_extractor_url} -db_batch_size=10 +db_batch_size=100 [sinker] sink_type=struct db_type=pg batch_size=1 url={pg_sinker_url} -# conflict_policy=interrupt +#conflict_policy=interrupt conflict_policy=ignore [filter] @@ -36,4 +36,4 @@ parallel_size=1 [pipeline] checkpoint_interval_secs=1 -buffer_size=1000 \ No newline at end of file +; buffer_size=1000 \ No newline at end of file From 7b43e7119cfc23723f2f31bfb43733d880978198 Mon Sep 17 00:00:00 2001 From: caiq1nyu Date: Sun, 28 Sep 2025 11:48:23 +0800 Subject: [PATCH 16/17] chore: change db_batch_size default value & do some format --- dt-common/src/config/task_config.rs | 14 ++++++++-- .../extractor/mysql/mysql_struct_extractor.rs | 28 +++++++++---------- .../src/extractor/pg/pg_struct_extractor.rs | 27 +++++++++--------- dt-task/src/extractor_util.rs | 10 ++++--- dt-task/src/task_runner.rs | 26 ++++++++--------- 5 files changed, 56 insertions(+), 49 deletions(-) diff --git a/dt-common/src/config/task_config.rs b/dt-common/src/config/task_config.rs index 2aa7c944..95e5a648 100644 --- a/dt-common/src/config/task_config.rs +++ b/dt-common/src/config/task_config.rs @@ -50,6 +50,8 @@ pub struct TaskConfig { pub metrics: MetricsConfig, } +pub const DEFAULT_DB_BATCH_SIZE: usize = 100; + // sections const EXTRACTOR: &str = "extractor"; const SINKER: &str = "sinker"; @@ -177,7 +179,11 @@ impl TaskConfig { url, db: String::new(), dbs: Vec::new(), - db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1000), + db_batch_size: loader.get_with_default( + EXTRACTOR, + "db_batch_size", + DEFAULT_DB_BATCH_SIZE, + ), }, ExtractType::FoxlakeS3 => { @@ -237,7 +243,11 @@ impl TaskConfig { schema: String::new(), schemas: Vec::new(), do_global_structs: false, - db_batch_size: loader.get_with_default(EXTRACTOR, "db_batch_size", 1000), + db_batch_size: loader.get_with_default( + EXTRACTOR, + "db_batch_size", + DEFAULT_DB_BATCH_SIZE, + ), }, _ => bail! { not_supported_err }, diff --git a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs index 191f9d76..ce5db336 100644 --- a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs +++ b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs @@ -1,15 +1,6 @@ use std::collections::HashSet; -use anyhow::bail; use async_trait::async_trait; -use dt_common::error::Error; -use dt_common::meta::struct_meta::struct_data::StructData; -use dt_common::{log_info, rdb_filter::RdbFilter}; - -use dt_common::meta::{ - mysql::mysql_meta_manager::MysqlMetaManager, - struct_meta::statement::struct_statement::StructStatement, -}; use sqlx::{MySql, Pool}; use crate::close_conn_pool; @@ -17,6 +8,15 @@ use crate::{ extractor::base_extractor::BaseExtractor, meta_fetcher::mysql::mysql_struct_fetcher::MysqlStructFetcher, Extractor, }; +use dt_common::{ + config::task_config::DEFAULT_DB_BATCH_SIZE, + log_info, + meta::{ + mysql::mysql_meta_manager::MysqlMetaManager, + struct_meta::{statement::struct_statement::StructStatement, struct_data::StructData}, + }, + rdb_filter::RdbFilter, +}; pub struct MysqlStructExtractor { pub base_extractor: BaseExtractor, @@ -81,13 +81,11 @@ impl MysqlStructExtractor { self.base_extractor.push_struct(struct_data).await } - pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result<()> { - let max_db_batch_size = 1000; - let min_db_batch_size = 1; - if db_batch_size < min_db_batch_size || db_batch_size > max_db_batch_size { - bail! {Error::ConfigError(format!(r#"db_batch_size {} is not valid, should be in range ({}, {})"#, db_batch_size, min_db_batch_size, max_db_batch_size))} + pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result { + if db_batch_size < 1 || db_batch_size > 1000 { + Ok(DEFAULT_DB_BATCH_SIZE) } else { - Ok(()) + Ok(db_batch_size) } } } diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index 932f324e..d1a53370 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -1,14 +1,6 @@ use std::collections::HashSet; -use anyhow::bail; use async_trait::async_trait; -use dt_common::error::Error; -use dt_common::meta::struct_meta::struct_data::StructData; -use dt_common::{log_info, rdb_filter::RdbFilter}; - -use dt_common::meta::struct_meta::statement::struct_statement::StructStatement; -use dt_common::meta::struct_meta::structure::structure_type::StructureType; - use sqlx::{Pool, Postgres}; use crate::close_conn_pool; @@ -16,6 +8,15 @@ use crate::{ extractor::base_extractor::BaseExtractor, meta_fetcher::pg::pg_struct_fetcher::PgStructFetcher, Extractor, }; +use dt_common::{ + config::task_config::DEFAULT_DB_BATCH_SIZE, + log_info, + meta::struct_meta::{ + statement::struct_statement::StructStatement, struct_data::StructData, + structure::structure_type::StructureType, + }, + rdb_filter::RdbFilter, +}; pub struct PgStructExtractor { pub base_extractor: BaseExtractor, @@ -99,13 +100,11 @@ impl PgStructExtractor { self.base_extractor.push_struct(struct_data).await } - pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result<()> { - let max_db_batch_size = 1000; - let min_db_batch_size = 1; - if db_batch_size < min_db_batch_size || db_batch_size > max_db_batch_size { - bail! {Error::ConfigError(format!(r#"db_batch_size {} is not valid, should be in range ({}, {})"#, db_batch_size, min_db_batch_size, max_db_batch_size))} + pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result { + if db_batch_size < 1 || db_batch_size > 1000 { + Ok(DEFAULT_DB_BATCH_SIZE) } else { - Ok(()) + Ok(db_batch_size) } } } diff --git a/dt-task/src/extractor_util.rs b/dt-task/src/extractor_util.rs index 20b63301..4b77463d 100644 --- a/dt-task/src/extractor_util.rs +++ b/dt-task/src/extractor_util.rs @@ -338,7 +338,8 @@ impl ExtractorUtil { db_batch_size, .. } => { - MysqlStructExtractor::validate_db_batch_size(db_batch_size)?; + let db_batch_size_validated = + MysqlStructExtractor::validate_db_batch_size(db_batch_size)?; // TODO, pass max_connections as parameter let conn_pool = TaskUtil::create_mysql_conn_pool(&url, 2, enable_sqlx_log, false).await?; @@ -347,7 +348,7 @@ impl ExtractorUtil { dbs, filter, base_extractor, - db_batch_size, + db_batch_size: db_batch_size_validated, }; Box::new(extractor) } @@ -359,7 +360,8 @@ impl ExtractorUtil { db_batch_size, .. } => { - PgStructExtractor::validate_db_batch_size(db_batch_size)?; + let db_batch_size_validated = + PgStructExtractor::validate_db_batch_size(db_batch_size)?; // TODO, pass max_connections as parameter let conn_pool = TaskUtil::create_pg_conn_pool(&url, 2, enable_sqlx_log, false).await?; @@ -369,7 +371,7 @@ impl ExtractorUtil { do_global_structs, filter, base_extractor, - db_batch_size, + db_batch_size: db_batch_size_validated, }; Box::new(extractor) } diff --git a/dt-task/src/task_runner.rs b/dt-task/src/task_runner.rs index baaa107f..ab235c69 100644 --- a/dt-task/src/task_runner.rs +++ b/dt-task/src/task_runner.rs @@ -146,21 +146,19 @@ impl TaskRunner { match &self.config.extractor { ExtractorConfig::MysqlStruct { url, .. } | ExtractorConfig::PgStruct { url, .. } => { - let mut pending_task = self + let mut pending_tasks = self .build_pending_tasks(url, &router, &snapshot_resumer, &cdc_resumer, false) .await?; - if !pending_task.is_empty() { - if let Some(task_context) = pending_task.pop_front() { - self.clone() - .start_single_task( - &task_context.extractor_config, - &router, - &snapshot_resumer, - &cdc_resumer, - false, - ) - .await? - } + if let Some(task_context) = pending_tasks.pop_front() { + self.clone() + .start_single_task( + &task_context.extractor_config, + &router, + &snapshot_resumer, + &cdc_resumer, + false, + ) + .await? } } @@ -862,7 +860,7 @@ impl TaskRunner { } => ExtractorConfig::MysqlStruct { url: url.clone(), db: db.clone(), - dbs: schemas.clone(), + dbs: schemas, db_batch_size: db_batch_size.clone(), }, ExtractorConfig::PgStruct { From 0fa26a2d1ddcb2e899d3b81ffd1df451d79c7962 Mon Sep 17 00:00:00 2001 From: caiq1nyu Date: Sun, 28 Sep 2025 11:51:53 +0800 Subject: [PATCH 17/17] chore: code format --- .../src/extractor/mysql/mysql_struct_extractor.rs | 9 +++++++-- dt-connector/src/extractor/pg/pg_struct_extractor.rs | 9 +++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs index ce5db336..080a770a 100644 --- a/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs +++ b/dt-connector/src/extractor/mysql/mysql_struct_extractor.rs @@ -10,7 +10,7 @@ use crate::{ }; use dt_common::{ config::task_config::DEFAULT_DB_BATCH_SIZE, - log_info, + log_info, log_warn, meta::{ mysql::mysql_meta_manager::MysqlMetaManager, struct_meta::{statement::struct_statement::StructStatement, struct_data::StructData}, @@ -53,7 +53,7 @@ impl MysqlStructExtractor { let meta_manager = MysqlMetaManager::new(self.conn_pool.clone()).await?; let mut fetcher = MysqlStructFetcher { conn_pool: self.conn_pool.to_owned(), - dbs: dbs, + dbs, filter: Some(self.filter.to_owned()), meta_manager, }; @@ -83,6 +83,11 @@ impl MysqlStructExtractor { pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result { if db_batch_size < 1 || db_batch_size > 1000 { + log_warn!( + "db_batch_size {} is not valid, using default value: {}", + db_batch_size, + DEFAULT_DB_BATCH_SIZE + ); Ok(DEFAULT_DB_BATCH_SIZE) } else { Ok(db_batch_size) diff --git a/dt-connector/src/extractor/pg/pg_struct_extractor.rs b/dt-connector/src/extractor/pg/pg_struct_extractor.rs index d1a53370..15b6c92f 100644 --- a/dt-connector/src/extractor/pg/pg_struct_extractor.rs +++ b/dt-connector/src/extractor/pg/pg_struct_extractor.rs @@ -10,7 +10,7 @@ use crate::{ }; use dt_common::{ config::task_config::DEFAULT_DB_BATCH_SIZE, - log_info, + log_info, log_warn, meta::struct_meta::{ statement::struct_statement::StructStatement, struct_data::StructData, structure::structure_type::StructureType, @@ -64,7 +64,7 @@ impl PgStructExtractor { ) -> anyhow::Result<()> { let mut pg_fetcher = PgStructFetcher { conn_pool: self.conn_pool.to_owned(), - schemas: schemas, + schemas, filter: Some(self.filter.to_owned()), }; @@ -102,6 +102,11 @@ impl PgStructExtractor { pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result { if db_batch_size < 1 || db_batch_size > 1000 { + log_warn!( + "db_batch_size {} is not valid, using default value: {}", + db_batch_size, + DEFAULT_DB_BATCH_SIZE + ); Ok(DEFAULT_DB_BATCH_SIZE) } else { Ok(db_batch_size)