Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dt-common/src/config/extractor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ pub enum ExtractorConfig {
MysqlStruct {
url: String,
db: String,
dbs: Vec<String>,
db_batch_size: usize,
},

PgStruct {
url: String,
schema: String,
schemas: Vec<String>,
do_global_structs: bool,
db_batch_size: usize,
},

MysqlSnapshot {
Expand Down
14 changes: 14 additions & 0 deletions dt-common/src/config/task_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub struct TaskConfig {
pub metrics: MetricsConfig,
}

pub const DEFAULT_DB_BATCH_SIZE: usize = 100;

// sections
const EXTRACTOR: &str = "extractor";
const SINKER: &str = "sinker";
Expand Down Expand Up @@ -176,6 +178,12 @@ impl TaskConfig {
ExtractType::Struct => ExtractorConfig::MysqlStruct {
url,
db: String::new(),
dbs: Vec::new(),
db_batch_size: loader.get_with_default(
EXTRACTOR,
"db_batch_size",
DEFAULT_DB_BATCH_SIZE,
),
},

ExtractType::FoxlakeS3 => {
Expand Down Expand Up @@ -233,7 +241,13 @@ impl TaskConfig {
ExtractType::Struct => ExtractorConfig::PgStruct {
url,
schema: String::new(),
schemas: Vec::new(),
do_global_structs: false,
db_batch_size: loader.get_with_default(
EXTRACTOR,
"db_batch_size",
DEFAULT_DB_BATCH_SIZE,
),
},

_ => bail! { not_supported_err },
Expand Down
63 changes: 46 additions & 17 deletions dt-connector/src/extractor/mysql/mysql_struct_extractor.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,45 @@
use async_trait::async_trait;
use dt_common::meta::struct_meta::struct_data::StructData;
use dt_common::{log_info, rdb_filter::RdbFilter};
use std::collections::HashSet;

use dt_common::meta::{
mysql::mysql_meta_manager::MysqlMetaManager,
struct_meta::statement::struct_statement::StructStatement,
};
use async_trait::async_trait;
use sqlx::{MySql, Pool};

use crate::close_conn_pool;
use crate::{
extractor::base_extractor::BaseExtractor,
meta_fetcher::mysql::mysql_struct_fetcher::MysqlStructFetcher, Extractor,
};
use dt_common::{
config::task_config::DEFAULT_DB_BATCH_SIZE,
log_info, log_warn,
meta::{
mysql::mysql_meta_manager::MysqlMetaManager,
struct_meta::{statement::struct_statement::StructStatement, struct_data::StructData},
},
rdb_filter::RdbFilter,
};

pub struct MysqlStructExtractor {
pub base_extractor: BaseExtractor,
pub conn_pool: Pool<MySql>,
pub db: String,
pub dbs: Vec<String>,
pub filter: RdbFilter,
pub db_batch_size: usize,
}

#[async_trait]
impl Extractor for MysqlStructExtractor {
async fn extract(&mut self) -> anyhow::Result<()> {
log_info!("MysqlStructExtractor starts, schema: {}", self.db,);
self.extract_internal().await?;
log_info!("MysqlStructExtractor starts...");
let db_chunks: Vec<Vec<String>> = self
.dbs
.chunks(self.db_batch_size)
.map(|chunk| chunk.to_vec())
.collect();
for db_chunk in db_chunks.into_iter() {
log_info!("MysqlStructExtractor extracts dbs: {}", db_chunk.join(","));
self.extract_internal(db_chunk.into_iter().collect())
.await?;
}
self.base_extractor.wait_task_finish().await
}

Expand All @@ -35,22 +49,24 @@ impl Extractor for MysqlStructExtractor {
}

impl MysqlStructExtractor {
pub async fn extract_internal(&mut self) -> anyhow::Result<()> {
pub async fn extract_internal(&mut self, dbs: HashSet<String>) -> anyhow::Result<()> {
let meta_manager = MysqlMetaManager::new(self.conn_pool.clone()).await?;
let mut fetcher = MysqlStructFetcher {
conn_pool: self.conn_pool.to_owned(),
db: self.db.clone(),
dbs,
filter: Some(self.filter.to_owned()),
meta_manager,
};

// database
let database_statement = fetcher.get_create_database_statement().await?;
self.push_dt_data(StructStatement::MysqlCreateDatabase(database_statement))
.await?;
let database_statements = fetcher.get_create_database_statements("").await?;
for database_statement in database_statements {
self.push_dt_data(StructStatement::MysqlCreateDatabase(database_statement))
.await?;
}

// tables
for table_statement in fetcher.get_create_table_statements("").await? {
for table_statement in fetcher.get_create_table_statements("", "").await? {
self.push_dt_data(StructStatement::MysqlCreateTable(table_statement))
.await?;
}
Expand All @@ -59,9 +75,22 @@ impl MysqlStructExtractor {

pub async fn push_dt_data(&mut self, statement: StructStatement) -> anyhow::Result<()> {
let struct_data = StructData {
schema: self.db.clone(),
schema: "".to_string(),
statement,
};
self.base_extractor.push_struct(struct_data).await
}

pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result<usize> {
if db_batch_size < 1 || db_batch_size > 1000 {
log_warn!(
"db_batch_size {} is not valid, using default value: {}",
db_batch_size,
DEFAULT_DB_BATCH_SIZE
);
Ok(DEFAULT_DB_BATCH_SIZE)
} else {
Ok(db_batch_size)
}
}
}
76 changes: 58 additions & 18 deletions dt-connector/src/extractor/pg/pg_struct_extractor.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,53 @@
use async_trait::async_trait;
use dt_common::meta::struct_meta::struct_data::StructData;
use dt_common::{log_info, rdb_filter::RdbFilter};

use dt_common::meta::struct_meta::statement::struct_statement::StructStatement;
use dt_common::meta::struct_meta::structure::structure_type::StructureType;
use std::collections::HashSet;

use async_trait::async_trait;
use sqlx::{Pool, Postgres};

use crate::close_conn_pool;
use crate::{
extractor::base_extractor::BaseExtractor, meta_fetcher::pg::pg_struct_fetcher::PgStructFetcher,
Extractor,
};
use dt_common::{
config::task_config::DEFAULT_DB_BATCH_SIZE,
log_info, log_warn,
meta::struct_meta::{
statement::struct_statement::StructStatement, struct_data::StructData,
structure::structure_type::StructureType,
},
rdb_filter::RdbFilter,
};

pub struct PgStructExtractor {
pub base_extractor: BaseExtractor,
pub conn_pool: Pool<Postgres>,
pub schema: String,
pub schemas: Vec<String>,
pub do_global_structs: bool,
pub filter: RdbFilter,
pub db_batch_size: usize,
}

#[async_trait]
impl Extractor for PgStructExtractor {
async fn extract(&mut self) -> anyhow::Result<()> {
log_info!("PgStructExtractor starts, schema: {}", self.schema);
self.extract_internal().await?;
log_info!("PgStructExtractor starts...");
let schema_chunks: Vec<Vec<String>> = self
.schemas
.chunks(self.db_batch_size)
.map(|chunk| chunk.to_vec())
.collect();
let do_global_structs = schema_chunks.len() - 1;
for (flag, schema_chunk) in schema_chunks.into_iter().enumerate() {
log_info!(
"PgStructExtractor extracts schemas: {}",
schema_chunk.join(",")
);
self.extract_internal(
schema_chunk.into_iter().collect(),
flag == do_global_structs,
)
.await?;
}
self.base_extractor.wait_task_finish().await
}

Expand All @@ -35,25 +57,30 @@ impl Extractor for PgStructExtractor {
}

impl PgStructExtractor {
pub async fn extract_internal(&mut self) -> anyhow::Result<()> {
pub async fn extract_internal(
&mut self,
schemas: HashSet<String>,
do_global_structs: bool,
) -> anyhow::Result<()> {
let mut pg_fetcher = PgStructFetcher {
conn_pool: self.conn_pool.to_owned(),
schema: self.schema.clone(),
schemas,
filter: Some(self.filter.to_owned()),
};

// schema
let schema_statement = pg_fetcher.get_create_schema_statement().await?;
self.push_dt_data(StructStatement::PgCreateSchema(schema_statement))
.await?;
// schemas
for schema_statement in pg_fetcher.get_create_schema_statements("").await? {
self.push_dt_data(StructStatement::PgCreateSchema(schema_statement))
.await?;
}

// tables
for table_statement in pg_fetcher.get_create_table_statements("").await? {
for table_statement in pg_fetcher.get_create_table_statements("", "").await? {
self.push_dt_data(StructStatement::PgCreateTable(table_statement))
.await?;
}

if self.do_global_structs && !self.filter.filter_structure(&StructureType::Rbac) {
if do_global_structs && !self.filter.filter_structure(&StructureType::Rbac) {
// do rbac init
let rbac_statements = pg_fetcher.get_create_rbac_statements().await?;
for statement in rbac_statements {
Expand All @@ -67,9 +94,22 @@ impl PgStructExtractor {

pub async fn push_dt_data(&mut self, statement: StructStatement) -> anyhow::Result<()> {
let struct_data = StructData {
schema: self.schema.clone(),
schema: "".to_string(),
statement,
};
self.base_extractor.push_struct(struct_data).await
}

pub fn validate_db_batch_size(db_batch_size: usize) -> anyhow::Result<usize> {
if db_batch_size < 1 || db_batch_size > 1000 {
log_warn!(
"db_batch_size {} is not valid, using default value: {}",
db_batch_size,
DEFAULT_DB_BATCH_SIZE
);
Ok(DEFAULT_DB_BATCH_SIZE)
} else {
Ok(db_batch_size)
}
}
}
Loading
Loading