Skip to content

Commit

Permalink
fix: Fixed setTimeout type casting issue in FlinkEngineExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
GSHF committed Dec 29, 2024
1 parent 04be2c8 commit 12b63a2
Showing 1 changed file with 45 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,28 +62,54 @@ public void init(JobExecutionRequest jobExecutionRequest, Logger logger, Configu

@Override
public void execute() throws Exception {
try {
String command = buildCommand();
logger.info("flink task command: {}", command);
ProcessResult result = shellCommandProcess.run(command);

// Check exit code and set execution result
if (result.getExitStatusCode() == ExecutionStatus.SUCCESS.getCode()) {
processResult.setExitStatusCode(ExecutionStatus.SUCCESS.getCode());
processResult.setProcessId(Integer.valueOf(String.valueOf(jobExecutionRequest.getJobExecutionId())));
logger.info("Flink job executed successfully");
} else {
int retryCount = 0;
int maxRetries = configurations.getInt("flink.task.max.retries", 3);
long retryInterval = configurations.getLong("flink.task.retry.interval", 10000);
long timeout = configurations.getLong("flink.task.timeout", 3600000); // 1 hour default

while (retryCount <= maxRetries) {
try {
String command = buildCommand();
logger.info("flink task command: {}", command);

// Set timeout for the process
((FlinkCommandProcess)shellCommandProcess).setTimeout(timeout);
ProcessResult result = shellCommandProcess.run(command);

if (result.getExitStatusCode() == ExecutionStatus.SUCCESS.getCode()) {
processResult.setExitStatusCode(ExecutionStatus.SUCCESS.getCode());
processResult.setProcessId(Integer.valueOf(String.valueOf(jobExecutionRequest.getJobExecutionId())));
logger.info("Flink job executed successfully");
return;
} else {
String errorMsg = String.format("Flink job execution failed with exit code: %d", result.getExitStatusCode());
logger.error(errorMsg);

if (retryCount < maxRetries) {
logger.info("Retrying... Attempt {} of {}", retryCount + 1, maxRetries);
Thread.sleep(retryInterval);
retryCount++;
continue;
}

processResult.setExitStatusCode(ExecutionStatus.FAILURE.getCode());
processResult.setProcessId(Integer.valueOf(String.valueOf(jobExecutionRequest.getJobExecutionId())));
throw new RuntimeException(errorMsg);
}
} catch (Exception e) {
logger.error("flink task error", e);

if (retryCount < maxRetries) {
logger.info("Retrying... Attempt {} of {}", retryCount + 1, maxRetries);
Thread.sleep(retryInterval);
retryCount++;
continue;
}

processResult.setExitStatusCode(ExecutionStatus.FAILURE.getCode());
processResult.setProcessId(Integer.valueOf(String.valueOf(jobExecutionRequest.getJobExecutionId())));
String errorMsg = String.format("Flink job execution failed with exit code: %d", result.getExitStatusCode());
logger.error(errorMsg);
throw new RuntimeException(errorMsg);
throw e;
}
} catch (Exception e) {
logger.error("flink task error", e);
processResult.setExitStatusCode(ExecutionStatus.FAILURE.getCode());
processResult.setProcessId(Integer.valueOf(String.valueOf(jobExecutionRequest.getJobExecutionId())));
throw e;
}
}

Expand Down

0 comments on commit 12b63a2

Please sign in to comment.