Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -717,40 +717,57 @@ private static TaskStatus runSubtasks(
final int totalNumSpecs = tasks.size();
log.info("Generated [%d] MSQControllerTask specs", totalNumSpecs);

TaskStatus firstFailure = null;
int failCnt = 0;

for (MSQControllerTask eachTask : tasks) {
final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachTask);
if (!currentSubTaskHolder.setTask(eachTask)) {
for (int taskCnt = 0; taskCnt < tasks.size(); taskCnt++) {
final MSQControllerTask task = tasks.get(taskCnt);
final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(task);
if (!currentSubTaskHolder.setTask(task)) {
String errMsg = "Task was asked to stop. Finish as failed.";
log.info(errMsg);
log.info("%s", errMsg);
return TaskStatus.failure(compactionTaskId, errMsg);
}
try {
if (eachTask.isReady(toolbox.getTaskActionClient())) {
log.info("Running MSQControllerTask: " + json);
final TaskStatus eachResult = eachTask.run(toolbox);
if (!eachResult.isSuccess()) {
if (task.isReady(toolbox.getTaskActionClient())) {
log.info("Running MSQControllerTask number[%d]: %s", taskCnt, json);
final TaskStatus taskStatus = task.run(toolbox);
if (!taskStatus.isSuccess()) {
failCnt++;
log.warn("Failed to run MSQControllerTask: [%s].\nTrying the next MSQControllerTask.", json);
log.warn("Failed to run MSQControllerTask number[%d]: %s", taskCnt, taskStatus.getErrorMsg());
if (firstFailure == null) {
firstFailure = taskStatus;
}
}
} else {
failCnt++;
log.warn("MSQControllerTask is not ready: [%s].\nTrying the next MSQControllerTask.", json);
log.warn("MSQControllerTask number[%d] is not ready.", taskCnt);
}
}
catch (Exception e) {
failCnt++;
log.warn(e, "Failed to run MSQControllerTask: [%s].\nTrying the next MSQControllerTask.", json);
log.warn(e, "Failed to run MSQControllerTask number[%d].", taskCnt);
}
}
String msg = StringUtils.format(

log.info(
"Ran [%d] MSQControllerTasks, [%d] succeeded, [%d] failed",
totalNumSpecs,
totalNumSpecs - failCnt,
failCnt
);
log.info(msg);
return failCnt == 0 ? TaskStatus.success(compactionTaskId) : TaskStatus.failure(compactionTaskId, msg);

if (failCnt == 0) {
return TaskStatus.success(compactionTaskId);
} else if (firstFailure != null && failCnt == 1) {
return TaskStatus.failure(compactionTaskId, firstFailure.getErrorMsg());
} else {
final StringBuilder msgBuilder =
new StringBuilder().append(failCnt).append("/").append(totalNumSpecs).append(" jobs failed");
if (firstFailure != null) {
msgBuilder.append("; first failure was: ").append(firstFailure.getErrorMsg());
}
return TaskStatus.failure(compactionTaskId, msgBuilder.toString());
}
}
}
Loading