Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dt-common/src/monitor/prometheus_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ impl PrometheusMetrics {
"the records estimated by extractor plan",
TaskMetricsType::ExtractorPlanRecords,
);
register_handler(
"progress",
"the progress of task",
TaskMetricsType::Progress,
);
}
TaskType::Cdc => {
register_handler(
Expand Down
3 changes: 3 additions & 0 deletions dt-common/src/monitor/task_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub enum TaskMetricsType {
// TODO:
Delay,
Timestamp,
Progress,
TotalProgressCount,
FinishedProgressCount,

// TODO: These metrics describe the records and bytes pulled by extractor, different from ExtractorPushed*, which describe the overall traffic before filtering
ExtractorRpsMax,
Expand Down
19 changes: 18 additions & 1 deletion dt-common/src/monitor/task_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeMap, sync::Arc};
use std::{cmp, collections::BTreeMap, sync::Arc};

use async_trait::async_trait;
use dashmap::DashMap;
Expand Down Expand Up @@ -335,11 +335,28 @@ impl TaskMonitor {
}
calc_nowindow_metrics(&self.no_window_metrics_map, calc_monitors);

let mut total_progress_count = 0;
let mut finished_progress_count = 0;
for item in self.no_window_metrics_map.iter() {
metrics.insert(*item.key(), *item.value());
match item.key() {
TaskMetricsType::TotalProgressCount => {
total_progress_count = *item.value();
}
TaskMetricsType::FinishedProgressCount => {
finished_progress_count = *item.value();
}
_ => {}
}
#[cfg(feature = "metrics")]
self.prometheus_metrics.set_metrics(&metrics);
}
if total_progress_count > 0 {
metrics.insert(
TaskMetricsType::Progress,
cmp::min(finished_progress_count * 100 / total_progress_count, 100),
);
}

Some(metrics)
}
Expand Down
10 changes: 5 additions & 5 deletions dt-common/src/monitor/time_window_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ impl Default for WindowCounterStatistics {
Self {
sum: 0,
max: 0,
min: u64::MAX,
min: 0,
avg_by_count: 0,
max_by_sec: 0,
min_by_sec: u64::MAX,
min_by_sec: 0,
avg_by_sec: 0,
count: 0,
}
Expand Down Expand Up @@ -72,12 +72,12 @@ impl TimeWindowCounter {
self.refresh_window();

if self.counters.is_empty() {
return WindowCounterStatistics {
..Default::default()
};
return WindowCounterStatistics::default();
}

let mut statistics = WindowCounterStatistics::default();
statistics.min = u64::MAX;
statistics.min_by_sec = u64::MAX;

let mut sum_in_current_sec = 0;
let mut current_elapsed_secs = None;
Expand Down
6 changes: 3 additions & 3 deletions dt-connector/src/extractor/base_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dt_common::{
config_token_parser::{ConfigTokenParser, TokenEscapePair},
},
error::Error,
log_debug, log_error, log_info, log_warn,
log_debug, log_error, log_info,
meta::{
dcl_meta::{dcl_data::DclData, dcl_parser::DclParser},
ddl_meta::ddl_data::DdlData,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl BaseExtractor {
);

if heartbeat_interval_secs == 0 || heartbeat_tb.is_empty() {
log_warn!(
log_info!(
"heartbeat disabled, heartbeat_tb: {}, heartbeat_interval_secs: {}",
heartbeat_tb,
heartbeat_interval_secs
Expand All @@ -235,7 +235,7 @@ impl BaseExtractor {
);

if schema_tb.len() < 2 {
log_warn!("heartbeat disabled, heartbeat_tb should be like schema.tb");
log_info!("heartbeat disabled, heartbeat_tb should be like schema.tb");
return vec![];
}
schema_tb
Expand Down
6 changes: 3 additions & 3 deletions dt-connector/src/extractor/redis/redis_psync_extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use dt_common::rdb_filter::RdbFilter;
use dt_common::utils::sql_util::SqlUtil;
use dt_common::utils::time_util::TimeUtil;
use dt_common::{error::Error, log_info};
use dt_common::{log_debug, log_error, log_position, log_warn};
use dt_common::{log_debug, log_error, log_position};

pub struct RedisPsyncExtractor {
pub base_extractor: BaseExtractor,
Expand Down Expand Up @@ -238,7 +238,7 @@ impl RedisPsyncExtractor {
)
.await?;
} else {
log_warn!("heartbeat disabled, heartbeat_tb should be like db.key");
log_info!("heartbeat disabled, heartbeat_tb should be like db.key");
}

let mut heartbeat_timestamp = String::new();
Expand Down Expand Up @@ -391,7 +391,7 @@ impl RedisPsyncExtractor {
);

if self.heartbeat_interval_secs == 0 || db_id == i64::MIN || key.is_empty() {
log_warn!("heartbeat disabled, heartbeat_tb should be like db_id.key");
log_info!("heartbeat disabled, heartbeat_tb should be like db_id.key");
return Ok(());
}

Expand Down
96 changes: 69 additions & 27 deletions dt-task/src/task_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ use anyhow::{bail, Context};
use log4rs::config::RawConfig;
use ratelimit::Ratelimiter;
use tokio::{
fs::metadata, fs::File, io::AsyncReadExt, sync::Mutex, sync::RwLock, task::JoinSet,
time::Duration, try_join,
fs::{metadata, File},
io::AsyncReadExt,
select,
sync::{Mutex, RwLock},
task::JoinSet,
time::Duration,
try_join,
};

use super::{
Expand Down Expand Up @@ -180,9 +185,12 @@ impl TaskRunner {
.map(|s| s.to_owned())
.collect::<Vec<_>>();
if let Some(task_type) = task_type_option {
log_info!("begin to estimate record count");
let record_count =
TaskUtil::estimate_record_count(&task_type, url, db_type, &schemas, &filter)
.await?;
log_info!("estimate record count: {}", record_count);

self.task_monitor
.add_no_window_metrics(TaskMetricsType::ExtractorPlanRecords, record_count);
}
Expand Down Expand Up @@ -222,9 +230,15 @@ impl TaskRunner {

// find pending tables
let tbs = TaskUtil::list_tbs(url, schema, db_type).await?;

self.task_monitor
.add_no_window_metrics(TaskMetricsType::TotalProgressCount, tbs.len() as u64);

let mut finished_tbs = 0;
for tb in tbs.iter() {
if snapshot_resumer.check_finished(schema, tb) {
log_info!("schema: {}, tb: {}, already finished", schema, tb);
finished_tbs += 1;
continue;
}
if filter.filter_event(schema, tb, &RowType::Insert) {
Expand All @@ -233,6 +247,8 @@ impl TaskRunner {
}
pending_tbs.push_back((schema.to_owned(), tb.to_owned()));
}
self.task_monitor
.add_no_window_metrics(TaskMetricsType::FinishedProgressCount, finished_tbs as u64);
}

// start a thread to flush global monitors
Expand Down Expand Up @@ -573,6 +589,8 @@ impl TaskRunner {
}
.to_string()
);
self.task_monitor
.add_no_window_metrics(TaskMetricsType::FinishedProgressCount, 1);
}

// remove monitors from global monitors
Expand Down Expand Up @@ -726,40 +744,64 @@ impl TaskRunner {
T1: FlushableMonitor + Send + Sync + 'static,
T2: FlushableMonitor + Send + Sync + 'static,
{
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
interval.tick().await;

loop {
// do an extra flush before exit if task finished
let finished = shut_down.load(Ordering::Acquire);
if !finished {
TimeUtil::sleep_millis(interval_secs * 1000).await;
if shut_down.load(Ordering::Acquire) {
Self::do_flush_monitors(t1_monitors, t2_monitors).await;
break;
}

let t1_futures = t1_monitors
.iter()
.map(|monitor| {
let monitor = monitor.clone();
async move { monitor.flush().await }
})
.collect::<Vec<_>>();

let t2_futures = t2_monitors
.iter()
.map(|monitor| {
let monitor = monitor.clone();
async move { monitor.flush().await }
})
.collect::<Vec<_>>();

tokio::join!(
futures::future::join_all(t1_futures),
futures::future::join_all(t2_futures)
);
select! {
_ = interval.tick() => {
Self::do_flush_monitors(t1_monitors, t2_monitors).await;
}
_ = Self::wait_for_shutdown(shut_down.clone()) => {
log_info!("task shutdown detected, do final flush");
Self::do_flush_monitors(t1_monitors, t2_monitors).await;
break;
}
}
}
}

if finished {
async fn wait_for_shutdown(shut_down: Arc<AtomicBool>) {
loop {
if shut_down.load(Ordering::Acquire) {
break;
}
TimeUtil::sleep_millis(100).await;
}
}

async fn do_flush_monitors<T1, T2>(t1_monitors: &[Arc<T1>], t2_monitors: &[Arc<T2>])
where
T1: FlushableMonitor + Send + Sync + 'static,
T2: FlushableMonitor + Send + Sync + 'static,
{
let t1_futures = t1_monitors
.iter()
.map(|monitor| {
let monitor = monitor.clone();
async move { monitor.flush().await }
})
.collect::<Vec<_>>();

let t2_futures = t2_monitors
.iter()
.map(|monitor| {
let monitor = monitor.clone();
async move { monitor.flush().await }
})
.collect::<Vec<_>>();

tokio::join!(
futures::future::join_all(t1_futures),
futures::future::join_all(t2_futures)
);
}

async fn pre_single_task(&self, sinker_data_marker: Option<DataMarker>) -> anyhow::Result<()> {
// create heartbeat table
let schema_tb = match &self.config.extractor {
Expand Down
Loading