-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[JBPM-10088] Removing timers when rollback #2365
Changes from all commits
27c29b1
60f0b24
61b735b
6c364e7
7dda523
ab096b6
475164c
5e2aed8
5f98688
583e8fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,7 @@ | |
import javax.ejb.Timer; | ||
import javax.ejb.TimerConfig; | ||
import javax.ejb.TimerHandle; | ||
import javax.ejb.TimerService; | ||
import javax.ejb.TransactionAttribute; | ||
import javax.ejb.TransactionAttributeType; | ||
|
||
|
@@ -73,7 +74,7 @@ public class EJBTimerScheduler { | |
private ConcurrentMap<String, TimerJobInstance> localCache = new ConcurrentHashMap<String, TimerJobInstance>(); | ||
|
||
@Resource | ||
protected javax.ejb.TimerService timerService; | ||
protected TimerService timerService; | ||
|
||
@Resource | ||
protected SessionContext ctx; | ||
|
@@ -111,7 +112,7 @@ public void executeTimerJob(Timer timer) { | |
Thread.currentThread().interrupt(); | ||
} | ||
try { | ||
invokeTransaction(this::executeTimerJobInstance, timerJobInstance); | ||
executeTimerJobInstance(timerJobInstance); | ||
} catch (Exception e) { | ||
recoverTimerJobInstance(timerJob, timer, e); | ||
} | ||
|
@@ -186,7 +187,12 @@ private interface Transaction<I> { | |
|
||
@TransactionAttribute(value = TransactionAttributeType.REQUIRES_NEW) | ||
public <I> void transaction(Transaction<I> operation, I item) throws Exception { | ||
operation.doWork(item); | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The transaction should be rollback if there is any exception |
||
operation.doWork(item); | ||
} catch (Exception transactionEx) { | ||
ctx.setRollbackOnly(); | ||
throw transactionEx; | ||
} | ||
} | ||
|
||
private <I> void invokeTransaction (Transaction<I> operation, I item) throws Exception { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,49 +28,42 @@ | |
import javax.naming.InitialContext; | ||
import javax.naming.NamingException; | ||
import javax.persistence.EntityManager; | ||
import javax.persistence.EntityManagerFactory; | ||
|
||
import org.drools.core.time.InternalSchedulerService; | ||
import org.drools.core.time.Job; | ||
import org.drools.core.time.JobContext; | ||
import org.drools.core.time.JobHandle; | ||
import org.drools.core.time.TimerService; | ||
import org.drools.core.time.Trigger; | ||
import org.drools.core.time.impl.TimerJobInstance; | ||
import org.drools.persistence.api.TransactionManager; | ||
import org.drools.persistence.api.TransactionManagerFactory; | ||
import org.drools.persistence.jta.JtaTransactionManager; | ||
import org.jbpm.process.core.timer.GlobalSchedulerService; | ||
import org.jbpm.process.core.timer.JobNameHelper; | ||
import org.jbpm.process.core.timer.NamedJobContext; | ||
import org.jbpm.process.core.timer.SchedulerServiceInterceptor; | ||
import org.jbpm.process.core.timer.impl.DelegateSchedulerServiceInterceptor; | ||
import org.jbpm.process.core.timer.impl.GlobalTimerService; | ||
import org.jbpm.process.core.timer.impl.GlobalTimerService.GlobalJobHandle; | ||
import org.jbpm.runtime.manager.impl.SimpleRuntimeEnvironment; | ||
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager; | ||
import org.jbpm.runtime.manager.impl.jpa.TimerMappingInfo; | ||
import org.kie.internal.runtime.manager.InternalRuntimeManager; | ||
import org.kie.internal.runtime.manager.RuntimeEnvironment; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
||
public class EjbSchedulerService implements GlobalSchedulerService { | ||
private static final Logger logger = LoggerFactory.getLogger(EjbSchedulerService.class); | ||
|
||
private static final Boolean TRANSACTIONAL = Boolean.parseBoolean(System.getProperty("org.jbpm.ejb.timer.tx", "true")); | ||
|
||
private AtomicLong idCounter = new AtomicLong(); | ||
private TimerService globalTimerService; | ||
private GlobalTimerService globalTimerService; | ||
private EJBTimerScheduler scheduler; | ||
|
||
private SchedulerServiceInterceptor interceptor = new DelegateSchedulerServiceInterceptor(this); | ||
|
||
|
||
@Override | ||
public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) { | ||
long id = idCounter.getAndIncrement(); | ||
String jobName = getJobName(ctx, id); | ||
EjbGlobalJobHandle jobHandle = new EjbGlobalJobHandle(id, jobName, ((GlobalTimerService) globalTimerService).getTimerServiceId()); | ||
EjbGlobalJobHandle jobHandle = new EjbGlobalJobHandle(id, jobName, globalTimerService.getTimerServiceId()); | ||
|
||
TimerJobInstance jobInstance = null; | ||
// check if given timer job is marked as new timer meaning it was never scheduled before, | ||
|
@@ -89,7 +82,7 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) { | |
ctx, | ||
trigger, | ||
jobHandle, | ||
(InternalSchedulerService) globalTimerService); | ||
globalTimerService); | ||
|
||
jobHandle.setTimerJobInstance((TimerJobInstance) jobInstance); | ||
interceptor.internalSchedule(jobInstance); | ||
|
@@ -100,10 +93,6 @@ public JobHandle scheduleJob(Job job, JobContext ctx, Trigger trigger) { | |
public boolean removeJob(JobHandle jobHandle) { | ||
String uuid = ((EjbGlobalJobHandle) jobHandle).getUuid(); | ||
final Timer ejbTimer = getEjbTimer(getTimerMappinInfo(uuid)); | ||
if (TRANSACTIONAL && ejbTimer == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Timer might be expired (getEJBTimer will return null) and we still want try to remove it just in case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! |
||
logger.warn("EJB timer is null for uuid {} and transactional flag is enabled", uuid); | ||
return false; | ||
} | ||
boolean result = scheduler.removeJob(jobHandle, ejbTimer); | ||
logger.debug("Remove job returned {}", result); | ||
return result; | ||
|
@@ -113,7 +102,6 @@ private TimerJobInstance getTimerJobInstance (String uuid) { | |
return unwrapTimerJobInstance(getEjbTimer(getTimerMappinInfo(uuid))); | ||
} | ||
|
||
|
||
@Override | ||
public TimerJobInstance getTimerJobInstance(long processInstanceId, long timerId) { | ||
return unwrapTimerJobInstance(getEjbTimer(getTimerMappinInfo(processInstanceId, timerId))); | ||
|
@@ -124,10 +112,9 @@ private Timer getEjbTimer(TimerMappingInfo timerMappingInfo) { | |
if(timerMappingInfo == null || timerMappingInfo.getInfo() == null) { | ||
return null; | ||
} | ||
byte[] data = timerMappingInfo.getInfo(); | ||
return ((TimerHandle) new ObjectInputStream(new ByteArrayInputStream(data)).readObject()).getTimer(); | ||
return ((TimerHandle) new ObjectInputStream(new ByteArrayInputStream(timerMappingInfo.getInfo())).readObject()).getTimer(); | ||
} catch (Exception e) { | ||
logger.warn("wast not able to deserialize info field from timer info for uuid"); | ||
logger.warn("Problem retrieving timer for uuid {}", timerMappingInfo.getUuid(), e); | ||
return null; | ||
} | ||
} | ||
|
@@ -146,30 +133,17 @@ private TimerMappingInfo getTimerMappinInfo(long processInstanceId, long timerId | |
} | ||
|
||
private TimerMappingInfo getTimerMappingInfo(Function<EntityManager, List<TimerMappingInfo>> func) { | ||
InternalRuntimeManager manager = ((GlobalTimerService) globalTimerService).getRuntimeManager(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When using CMT, I do not think we need to create a different transaction here anymore |
||
String pu = ((InternalRuntimeManager) manager).getDeploymentDescriptor().getPersistenceUnit(); | ||
EntityManagerFactory emf = EntityManagerFactoryManager.get().getOrCreate(pu); | ||
EntityManager em = emf.createEntityManager(); | ||
JtaTransactionManager tm = (JtaTransactionManager) TransactionManagerFactory.get().newTransactionManager(); | ||
boolean txOwner = false; | ||
EntityManager em = EntityManagerFactoryManager.get() | ||
.getOrCreate(globalTimerService.getRuntimeManager() | ||
.getDeploymentDescriptor().getPersistenceUnit()) | ||
.createEntityManager(); | ||
try { | ||
if (tm != null && tm.getStatus() == TransactionManager.STATUS_ROLLEDBACK) { | ||
txOwner = tm.begin(); | ||
} | ||
List<TimerMappingInfo> info = func.apply(em); | ||
if (!info.isEmpty()) { | ||
return info.get(0); | ||
} else { | ||
return null; | ||
} | ||
|
||
return !info.isEmpty() ? info.get(0) : null; | ||
} catch (Exception ex) { | ||
logger.warn("Error getting mapping info ",ex); | ||
return null; | ||
} finally { | ||
if (tm != null) { | ||
tm.commit(txOwner); | ||
} | ||
em.close(); | ||
} | ||
} | ||
|
@@ -200,7 +174,7 @@ public void internalSchedule(TimerJobInstance timerJobInstance) { | |
|
||
@Override | ||
public void initScheduler(TimerService timerService) { | ||
this.globalTimerService = timerService; | ||
this.globalTimerService = (GlobalTimerService)timerService; | ||
try { | ||
this.scheduler = InitialContext.doLookup("java:module/EJBTimerScheduler"); | ||
} catch (NamingException e) { | ||
|
@@ -211,18 +185,17 @@ public void initScheduler(TimerService timerService) { | |
@Override | ||
public void shutdown() { | ||
// managed by container - no op | ||
|
||
} | ||
|
||
@Override | ||
public JobHandle buildJobHandleForContext(NamedJobContext ctx) { | ||
|
||
return new EjbGlobalJobHandle(-1, getJobName(ctx, -1L), ((GlobalTimerService) globalTimerService).getTimerServiceId()); | ||
return new EjbGlobalJobHandle(-1, getJobName(ctx, -1L), globalTimerService.getTimerServiceId()); | ||
} | ||
|
||
@Override | ||
public boolean isTransactional() { | ||
return TRANSACTIONAL; | ||
return true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is not point in keeping the variable. CMT is always transactional. |
||
} | ||
|
||
@Override | ||
|
@@ -237,11 +210,18 @@ public void setInterceptor(SchedulerServiceInterceptor interceptor) { | |
|
||
@Override | ||
public boolean isValid(GlobalJobHandle jobHandle) { | ||
|
||
return true; | ||
return true; | ||
} | ||
|
||
protected String getJobName(JobContext ctx, long id) { | ||
return JobNameHelper.getJobName(ctx, id); | ||
} | ||
|
||
@Override | ||
public | ||
void setEnvironment(RuntimeEnvironment environment) { | ||
if (environment instanceof SimpleRuntimeEnvironment) { | ||
((SimpleRuntimeEnvironment)environment).addToEnvironment("IS_TIMER_CMT", true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to tell GlobalJpaTimerJobInsance that we are using CMT |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need to create a new transaction here. Also PersistableRunner is on the middle, handling the transaction itself. Because that transaction is not longer usable once PersistableRunner commits it, we need to create a new one to execute recoverTimerJob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
before this was provoking an exception at setRollbackOnly call in the new end-2-end test, because there was no transaction