Skip to content

Commit

Permalink
Remove DisposableBean implements from SimpleJobService (#6035)
Browse files Browse the repository at this point in the history
This can be removed because SCDF does not manage internal jobs,
but rather launches boot apps that use Spring Batch

resolves #6034
  • Loading branch information
cppwfs authored Oct 30, 2024
1 parent 1a10c23 commit c2b0aed
Showing 1 changed file with 1 addition and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,29 @@
*/
package org.springframework.cloud.dataflow.server.batch;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.batch.core.launch.NoSuchJobInstanceException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.dao.ExecutionContextDao;
import org.springframework.batch.core.step.NoSuchStepException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
Expand All @@ -61,7 +49,7 @@
* @author Glenn Renfro
* @author Corneil du Plessis
*/
public class SimpleJobService implements JobService, DisposableBean {
public class SimpleJobService implements JobService {

private static final Logger logger = LoggerFactory.getLogger(SimpleJobService.class);

Expand All @@ -78,8 +66,6 @@ public class SimpleJobService implements JobService, DisposableBean {

private final ExecutionContextDao executionContextDao;

private Collection<JobExecution> activeExecutions = Collections.synchronizedList(new ArrayList<JobExecution>());

private JobOperator jobOperator;


Expand Down Expand Up @@ -339,62 +325,4 @@ private void checkJobExists(String jobName) throws NoSuchJobException {
throw new NoSuchJobException("No Job with that name either current or historic: [" + jobName + "]");
}
}

/**
* Stop all the active jobs and wait for them (up to a time out) to finish processing.
*/
@Override
public void destroy() throws Exception {

Exception firstException = null;

for (JobExecution jobExecution : activeExecutions) {
try {
if (jobExecution.isRunning()) {
stop(jobExecution.getId());
}
} catch (JobExecutionNotRunningException e) {
logger.info("JobExecution is not running so it cannot be stopped");
} catch (Exception e) {
logger.error("Unexpected exception stopping JobExecution", e);
if (firstException == null) {
firstException = e;
}
}
}

int count = 0;
int maxCount = (shutdownTimeout + 1000) / 1000;
while (!activeExecutions.isEmpty() && ++count < maxCount) {
logger.error("Waiting for " + activeExecutions.size() + " active executions to complete");
removeInactiveExecutions();
Thread.sleep(1000L);
}

if (firstException != null) {
throw firstException;
}

}

/**
* Check all the active executions and see if they are still actually running. Remove the
* ones that have completed.
*/
@Scheduled(fixedDelay = 60000)
public void removeInactiveExecutions() {

for (Iterator<JobExecution> iterator = activeExecutions.iterator(); iterator.hasNext(); ) {
JobExecution jobExecution = iterator.next();
try {
jobExecution = getJobExecution(jobExecution.getId());
} catch (NoSuchJobExecutionException e) {
logger.error("Unexpected exception loading JobExecution", e);
}
if (!jobExecution.isRunning()) {
iterator.remove();
}
}

}
}

0 comments on commit c2b0aed

Please sign in to comment.