From a3d6c631a8217417d3c0003a6b421a6bc6e8f4ec Mon Sep 17 00:00:00 2001 From: Wael Abid Date: Mon, 24 Apr 2023 19:11:44 -0400 Subject: [PATCH 1/3] always return false if not coordinator --- ludwig/trainers/trainer.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/ludwig/trainers/trainer.py b/ludwig/trainers/trainer.py index 1b9f227f20c..03deb71f57f 100644 --- a/ludwig/trainers/trainer.py +++ b/ludwig/trainers/trainer.py @@ -1078,8 +1078,8 @@ def resume_files_exist( training_progress_tracker_path: str, training_checkpoint_path: str, ) -> bool: - missing_files = [] if self.is_coordinator(): + missing_files = [] # training_progress.json if not path_exists(training_progress_tracker_path): missing_files.append(training_progress_tracker_path) @@ -1087,10 +1087,11 @@ def resume_files_exist( latest_ckpt = os.path.join(training_checkpoint_path, "latest.ckpt") if not path_exists(latest_ckpt): missing_files.append(latest_ckpt) - if missing_files: - logger.warning(f"Could not find {missing_files} while trying to resume model training.") - return False - return True + if missing_files: + logger.warning(f"Could not find {missing_files} while trying to resume model training.") + return False + return True + return False def resume_training_progress_tracker(self, training_progress_tracker_path): progress_tracker_dict = None From 4f527796b7e1ca34c0eb435f3cedff3ed4a73a89 Mon Sep 17 00:00:00 2001 From: Wael Abid Date: Mon, 24 Apr 2023 19:18:49 -0400 Subject: [PATCH 2/3] only download artifacts on coordinator --- ludwig/trainers/trainer.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/ludwig/trainers/trainer.py b/ludwig/trainers/trainer.py index 03deb71f57f..c1fc0e144d0 100644 --- a/ludwig/trainers/trainer.py +++ b/ludwig/trainers/trainer.py @@ -605,19 +605,22 @@ def train(self, training_set, validation_set=None, test_set=None, save_path="mod test_summary_writer = SummaryWriter(os.path.join(tensorboard_log_dir, TEST)) # ================ Resume logic ================ - if self.resume and self.resume_files_exist(training_progress_tracker_path, training_checkpoints_path): - logger.info("Resuming training from previous run.") - progress_tracker = self.resume_training_progress_tracker(training_progress_tracker_path) - self.resume_weights_and_optimizer(training_checkpoints_path, checkpoint) - else: - logger.info("Creating fresh model training run.") - progress_tracker = get_new_progress_tracker( - batch_size=self.batch_size, - learning_rate=self.base_learning_rate, - best_eval_metric_value=get_initial_validation_value(self.validation_metric), - best_increase_batch_size_eval_metric=get_initial_validation_value(self.increase_batch_size_eval_metric), - output_features=output_features, - ) + if self.is_coordinator(): + if self.resume and self.resume_files_exist(training_progress_tracker_path, training_checkpoints_path): + logger.info("Resuming training from previous run.") + progress_tracker = self.resume_training_progress_tracker(training_progress_tracker_path) + self.resume_weights_and_optimizer(training_checkpoints_path, checkpoint) + else: + logger.info("Creating fresh model training run.") + progress_tracker = get_new_progress_tracker( + batch_size=self.batch_size, + learning_rate=self.base_learning_rate, + best_eval_metric_value=get_initial_validation_value(self.validation_metric), + best_increase_batch_size_eval_metric=get_initial_validation_value( + self.increase_batch_size_eval_metric + ), + output_features=output_features, + ) # Distributed: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when From fb81a3d22377468122df94e4647ee68ab081ba74 Mon Sep 17 00:00:00 2001 From: Wael Abid Date: Tue, 25 Apr 2023 10:59:31 -0400 Subject: [PATCH 3/3] allow download for non-coordinator processes --- ludwig/trainers/trainer.py | 55 +++++++++++++++++--------------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/ludwig/trainers/trainer.py b/ludwig/trainers/trainer.py index c1fc0e144d0..ed2ee1d6a40 100644 --- a/ludwig/trainers/trainer.py +++ b/ludwig/trainers/trainer.py @@ -605,22 +605,19 @@ def train(self, training_set, validation_set=None, test_set=None, save_path="mod test_summary_writer = SummaryWriter(os.path.join(tensorboard_log_dir, TEST)) # ================ Resume logic ================ - if self.is_coordinator(): - if self.resume and self.resume_files_exist(training_progress_tracker_path, training_checkpoints_path): - logger.info("Resuming training from previous run.") - progress_tracker = self.resume_training_progress_tracker(training_progress_tracker_path) - self.resume_weights_and_optimizer(training_checkpoints_path, checkpoint) - else: - logger.info("Creating fresh model training run.") - progress_tracker = get_new_progress_tracker( - batch_size=self.batch_size, - learning_rate=self.base_learning_rate, - best_eval_metric_value=get_initial_validation_value(self.validation_metric), - best_increase_batch_size_eval_metric=get_initial_validation_value( - self.increase_batch_size_eval_metric - ), - output_features=output_features, - ) + if self.resume and self.resume_files_exist(training_progress_tracker_path, training_checkpoints_path): + logger.info("Resuming training from previous run.") + progress_tracker = self.resume_training_progress_tracker(training_progress_tracker_path) + self.resume_weights_and_optimizer(training_checkpoints_path, checkpoint) + else: + logger.info("Creating fresh model training run.") + progress_tracker = get_new_progress_tracker( + batch_size=self.batch_size, + learning_rate=self.base_learning_rate, + best_eval_metric_value=get_initial_validation_value(self.validation_metric), + best_increase_batch_size_eval_metric=get_initial_validation_value(self.increase_batch_size_eval_metric), + output_features=output_features, + ) # Distributed: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when @@ -1081,20 +1078,18 @@ def resume_files_exist( training_progress_tracker_path: str, training_checkpoint_path: str, ) -> bool: - if self.is_coordinator(): - missing_files = [] - # training_progress.json - if not path_exists(training_progress_tracker_path): - missing_files.append(training_progress_tracker_path) - # latest.ckpt in training_checkpoints/ - latest_ckpt = os.path.join(training_checkpoint_path, "latest.ckpt") - if not path_exists(latest_ckpt): - missing_files.append(latest_ckpt) - if missing_files: - logger.warning(f"Could not find {missing_files} while trying to resume model training.") - return False - return True - return False + missing_files = [] + # training_progress.json + if not path_exists(training_progress_tracker_path): + missing_files.append(training_progress_tracker_path) + # latest.ckpt in training_checkpoints/ + latest_ckpt = os.path.join(training_checkpoint_path, "latest.ckpt") + if not path_exists(latest_ckpt): + missing_files.append(latest_ckpt) + if missing_files: + logger.warning(f"Could not find {missing_files} while trying to resume model training.") + return False + return True def resume_training_progress_tracker(self, training_progress_tracker_path): progress_tracker_dict = None