Skip to content

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 committed Jan 13, 2025
1 parent 16b155b commit c180906
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.tasks.MinionTaskProgressStats;
import org.apache.pinot.spi.tasks.StatusEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A minion event observer that can track task progress status in memory.
*/
@ThreadSafe
public class MinionProgressObserver extends DefaultMinionEventObserver {
private static final Logger LOGGER = LoggerFactory.getLogger(MinionProgressObserver.class);

protected MinionTaskState _taskState;
protected MinionTaskState _taskState = MinionTaskState.UNKNOWN;
protected final Map<String, MinionTaskProgressStats.Timer> _stageTimes = new HashMap<>();
protected String _stage;
protected long _startTs;
Expand All @@ -61,9 +64,12 @@ public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {

@Override
public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullable Object progress) {
String progressMessage = null;
_taskState = MinionTaskState.IN_PROGRESS;
if (progress instanceof StatusEntry) {
setStageStats((StatusEntry) progress);
StatusEntry statusEntry = (StatusEntry) progress;
progressMessage = statusEntry.getStatus();
setStageStats(statusEntry);
} else if (progress instanceof MinionTaskProgressStats) {
MinionTaskProgressStats stats = (MinionTaskProgressStats) progress;
if (stats.getInputUnits() != null && !stats.getInputUnits().isEmpty()) {
Expand All @@ -77,16 +83,20 @@ public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, @Nullab
}
// Only one progress log must be recorded at once and should not be bulked
if (stats.getProgressLogs() != null && stats.getProgressLogs().size() == 1) {
progressMessage = stats.getProgressLogs().get(0).getStatus();
setStageStats(stats.getProgressLogs().get(0));
}
} else {
String progressMessage = progress == null ? "" : progress.toString();
progressMessage = progress == null ? "" : progress.toString();
setStageStats(new StatusEntry.Builder().status(progressMessage).build());
}
if (LOGGER.isDebugEnabled() && progressMessage != null) {
LOGGER.debug("Update progress: {} for task: {}", progressMessage, pinotTaskConfig.getTaskId());
}
}

@Nullable
@Override
@Nullable
public synchronized List<StatusEntry> getProgress() {
MinionTaskProgressStats minionTaskProgressStats = _progressManager.getTaskProgress(_taskId);
List<StatusEntry> progressLog = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.minion.event.DefaultMinionTaskProgressManager;
import org.apache.pinot.minion.event.MinionProgressObserver;
import org.apache.pinot.spi.tasks.MinionTaskProgressManager;


public class MinionTestUtils {
Expand All @@ -37,6 +38,16 @@ public static MinionProgressObserver getMinionProgressObserver() {
return progressObserver;
}

public static MinionProgressObserver getMinionProgressObserver(int progressLimit) {
MinionProgressObserver progressObserver = new MinionProgressObserver();
MinionConf conf = new MinionConf();
conf.setProperty(DefaultMinionTaskProgressManager.MAX_NUM_STATUS_TO_TRACK, progressLimit);
MinionTaskProgressManager progressManager = new DefaultMinionTaskProgressManager();
progressManager.init(conf);
progressObserver.init(progressManager);
return progressObserver;
}

public static PinotTaskConfig getPinotTaskConfig(String taskId) {
Map<String, String> taskConfigs = new HashMap<>();
taskConfigs.put("TASK_ID", taskId != null ? taskId : UUID.randomUUID().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class MinionProgressObserverTest {
@Test
public void testNotifyProgressStatus() {
MinionProgressObserver observer = MinionTestUtils.getMinionProgressObserver();
MinionProgressObserver observer = MinionTestUtils.getMinionProgressObserver(3);
PinotTaskConfig pinotTaskConfig = MinionTestUtils.getPinotTaskConfig(null);

observer.notifyTaskStart(pinotTaskConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public LogLevel getLevel() {
return _level;
}

@Override
public String toString() {
return "StatusEntry{" + "_ts=" + _ts + ", _level=" + _level + ", _stage='" + _stage + '\''
+ ", _status='" + _status + '\'' + '}';
}

public static class Builder {
private long _ts;
private LogLevel _level = LogLevel.INFO;
Expand Down

0 comments on commit c180906

Please sign in to comment.