diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java index bb85f976b8e..5be27f91390 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java @@ -28,6 +28,8 @@ import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.tasks.MinionTaskProgressStats; import org.apache.pinot.spi.tasks.StatusEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -35,8 +37,9 @@ */ @ThreadSafe public class MinionProgressObserver extends DefaultMinionEventObserver { + private static final Logger LOGGER = LoggerFactory.getLogger(MinionProgressObserver.class); - protected MinionTaskState _taskState; + protected MinionTaskState _taskState = MinionTaskState.UNKNOWN; protected final Map _stageTimes = new HashMap<>(); protected String _stage; protected long _startTs; @@ -61,9 +64,12 @@ public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) { @Override public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object progress) { + String progressMessage = null; _taskState = MinionTaskState.IN_PROGRESS; if (progress instanceof StatusEntry) { - setStageStats((StatusEntry) progress); + StatusEntry statusEntry = (StatusEntry) progress; + progressMessage = statusEntry.getStatus(); + setStageStats(statusEntry); } else if (progress instanceof MinionTaskProgressStats) { MinionTaskProgressStats stats = (MinionTaskProgressStats) progress; if (stats.getInputUnits() != null && !stats.getInputUnits().isEmpty()) { @@ -77,16 +83,20 @@ public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullab } // Only one progress log must be recorded at once and should not be bulked if (stats.getProgressLogs() != null && stats.getProgressLogs().size() == 1) { + progressMessage = stats.getProgressLogs().get(0).getStatus(); setStageStats(stats.getProgressLogs().get(0)); } } else { - String progressMessage = progress == null ? "" : progress.toString(); + progressMessage = progress == null ? "" : progress.toString(); setStageStats(new StatusEntry.Builder().status(progressMessage).build()); } + if (LOGGER.isDebugEnabled() && progressMessage != null) { + LOGGER.debug("Update progress: {} for task: {}", progressMessage, pinotTaskConfig.getTaskId()); + } } - @Nullable @Override + @Nullable public synchronized List getProgress() { MinionTaskProgressStats minionTaskProgressStats = _progressManager.getTaskProgress(_taskId); List progressLog = new ArrayList<>(); diff --git a/pinot-minion/src/test/java/org/apache/pinot/minion/MinionTestUtils.java b/pinot-minion/src/test/java/org/apache/pinot/minion/MinionTestUtils.java index 77564897827..a7f740d285c 100644 --- a/pinot-minion/src/test/java/org/apache/pinot/minion/MinionTestUtils.java +++ b/pinot-minion/src/test/java/org/apache/pinot/minion/MinionTestUtils.java @@ -24,6 +24,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.minion.event.DefaultMinionTaskProgressManager; import org.apache.pinot.minion.event.MinionProgressObserver; +import org.apache.pinot.spi.tasks.MinionTaskProgressManager; public class MinionTestUtils { @@ -37,6 +38,16 @@ public static MinionProgressObserver getMinionProgressObserver() { return progressObserver; } + public static MinionProgressObserver getMinionProgressObserver(int progressLimit) { + MinionProgressObserver progressObserver = new MinionProgressObserver(); + MinionConf conf = new MinionConf(); + conf.setProperty(DefaultMinionTaskProgressManager.MAX_NUM_STATUS_TO_TRACK, progressLimit); + MinionTaskProgressManager progressManager = new DefaultMinionTaskProgressManager(); + progressManager.init(conf); + progressObserver.init(progressManager); + return progressObserver; + } + public static PinotTaskConfig getPinotTaskConfig(String taskId) { Map taskConfigs = new HashMap<>(); taskConfigs.put("TASK_ID", taskId != null ? taskId : UUID.randomUUID().toString()); diff --git a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java index e2424f06e29..b0354a2c32c 100644 --- a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java +++ b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java @@ -32,7 +32,7 @@ public class MinionProgressObserverTest { @Test public void testNotifyProgressStatus() { - MinionProgressObserver observer = MinionTestUtils.getMinionProgressObserver(); + MinionProgressObserver observer = MinionTestUtils.getMinionProgressObserver(3); PinotTaskConfig pinotTaskConfig = MinionTestUtils.getPinotTaskConfig(null); observer.notifyTaskStart(pinotTaskConfig); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/StatusEntry.java b/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/StatusEntry.java index f85c6960aca..4307b8b2617 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/StatusEntry.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/tasks/StatusEntry.java @@ -47,6 +47,12 @@ public LogLevel getLevel() { return _level; } + @Override + public String toString() { + return "StatusEntry{" + "_ts=" + _ts + ", _level=" + _level + ", _stage='" + _stage + '\'' + + ", _status='" + _status + '\'' + '}'; + } + public static class Builder { private long _ts; private LogLevel _level = LogLevel.INFO;