Skip to content

Commit

Permalink
[JSPWIKI-1178] Address potential deadlock with JDK 21 Virtual Threads.
Browse files Browse the repository at this point in the history
Refactored synchronized blocks/methods containing blocking operations to prevent potential deadlocks introduced by the upcoming Virtual Threads feature in JDK 21.
  • Loading branch information
arturobernalg committed Oct 11, 2023
1 parent 59f1678 commit eb77ea5
Show file tree
Hide file tree
Showing 35 changed files with 1,903 additions and 872 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.Set;
import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.locks.ReentrantLock;

/**
* A singleton class that manages the addition and removal of WikiEvent listeners to a event source, as well as the firing of events
Expand Down Expand Up @@ -141,6 +142,17 @@ public final class WikiEventManager {
/* Singleton instance of the WikiEventManager. */
private static WikiEventManager c_instance;

/**
* A lock used to ensure thread safety when accessing shared resources.
* This lock provides more flexibility and capabilities than the intrinsic locking mechanism,
* such as the ability to attempt to acquire a lock with a timeout, or to interrupt a thread
* waiting to acquire a lock.
*
* @see java.util.concurrent.locks.ReentrantLock
*/
private static final ReentrantLock lock = new ReentrantLock();


/** Constructor for a WikiEventManager. */
private WikiEventManager() {
c_instance = this;
Expand All @@ -154,10 +166,15 @@ private WikiEventManager() {
* @return A shared instance of the WikiEventManager
*/
public static WikiEventManager getInstance() {
if( c_instance == null ) {
synchronized( WikiEventManager.class ) {
return new WikiEventManager();
// start up any post-instantiation services here
if (c_instance == null) {
lock.lock();
try {
if (c_instance == null) {
c_instance = new WikiEventManager();
// start up any post-instantiation services here
}
} finally {
lock.unlock();
}
}
return c_instance;
Expand Down Expand Up @@ -242,7 +259,8 @@ public static boolean removeWikiEventListener( final WikiEventListener listener
// get the Map.entry object for the entire Map, then check match on entry (listener)
final WikiEventManager mgr = getInstance();
final Map< Object, WikiEventDelegate > sources = Collections.synchronizedMap( mgr.getDelegates() );
synchronized( sources ) {
lock.lock();
try {
// get an iterator over the Map.Enty objects in the map
for( final Map.Entry< Object, WikiEventDelegate > entry : sources.entrySet() ) {
// the entry value is the delegate
Expand All @@ -253,16 +271,24 @@ public static boolean removeWikiEventListener( final WikiEventListener listener
removed = true; // was removed
}
}
} finally {
lock.unlock();
}
return removed;
}

private void removeDelegates() {
synchronized( m_delegates ) {
lock.lock();
try {
m_delegates.clear();
} finally {
lock.unlock();
}
synchronized( m_preloadCache ) {
lock.lock();
try {
m_preloadCache.clear();
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -315,7 +341,8 @@ private Map< Object, WikiEventDelegate > getDelegates() {
* @return the WikiEventDelegate.
*/
private WikiEventDelegate getDelegateFor( final Object client ) {
synchronized( m_delegates ) {
lock.lock();
try {
if( client == null || client instanceof Class ) { // then preload the cache
final WikiEventDelegate delegate = new WikiEventDelegate( client );
m_preloadCache.add( delegate );
Expand All @@ -342,6 +369,8 @@ private WikiEventDelegate getDelegateFor( final Object client ) {
m_delegates.put( client, delegate );
}
return delegate;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -386,7 +415,8 @@ private static final class WikiEventDelegate {
* @throws java.lang.UnsupportedOperationException if any attempt is made to modify the Set
*/
public Set< WikiEventListener > getWikiEventListeners() {
synchronized( m_listenerList ) {
lock.lock();
try {
final TreeSet< WikiEventListener > set = new TreeSet<>( new WikiEventListenerComparator() );
for( final WeakReference< WikiEventListener > wikiEventListenerWeakReference : m_listenerList ) {
final WikiEventListener l = wikiEventListenerWeakReference.get();
Expand All @@ -396,6 +426,8 @@ public Set< WikiEventListener > getWikiEventListeners() {
}

return Collections.unmodifiableSet( set );
} finally {
lock.unlock();
}
}

Expand All @@ -406,13 +438,16 @@ public Set< WikiEventListener > getWikiEventListeners() {
* @return true if the listener was added (i.e., it was not already in the list and was added)
*/
public boolean addWikiEventListener( final WikiEventListener listener ) {
synchronized( m_listenerList ) {
lock.lock();
try {
final boolean listenerAlreadyContained = m_listenerList.stream()
.map( WeakReference::get )
.anyMatch( ref -> ref == listener );
if( !listenerAlreadyContained ) {
return m_listenerList.add( new WeakReference<>( listener ) );
}
} finally {
lock.unlock();
}
return false;
}
Expand All @@ -424,14 +459,17 @@ public boolean addWikiEventListener( final WikiEventListener listener ) {
* @return true if the listener was removed (i.e., it was actually in the list and was removed)
*/
public boolean removeWikiEventListener( final WikiEventListener listener ) {
synchronized( m_listenerList ) {
lock.lock();
try {
for( final Iterator< WeakReference< WikiEventListener > > i = m_listenerList.iterator(); i.hasNext(); ) {
final WikiEventListener l = i.next().get();
if( l == listener ) {
i.remove();
return true;
}
}
} finally {
lock.unlock();
}

return false;
Expand All @@ -441,8 +479,11 @@ public boolean removeWikiEventListener( final WikiEventListener listener ) {
* Returns true if there are one or more listeners registered with this instance.
*/
public boolean isListening() {
synchronized( m_listenerList ) {
lock.lock();
try {
return !m_listenerList.isEmpty();
} finally {
lock.unlock();
}
}

Expand All @@ -452,7 +493,8 @@ public boolean isListening() {
public void fireEvent( final WikiEvent event ) {
boolean needsCleanup = false;
try {
synchronized( m_listenerList ) {
lock.lock();
try {
for( final WeakReference< WikiEventListener > wikiEventListenerWeakReference : m_listenerList ) {
final WikiEventListener listener = wikiEventListenerWeakReference.get();
if( listener != null ) {
Expand All @@ -472,6 +514,8 @@ public void fireEvent( final WikiEvent event ) {
}
}

} finally {
lock.unlock();
}
} catch( final ConcurrentModificationException e ) {
// We don't die, we just don't do notifications in that case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.String.format;

Expand Down Expand Up @@ -87,7 +88,18 @@ public class KendraSearchProvider implements SearchProvider {
private static final String PROP_KENDRA_INDEXDELAY = "jspwiki.kendra.indexdelay";
private static final String PROP_KENDRA_INITIALDELAY = "jspwiki.kendra.initialdelay";

/**
* A lock used to ensure thread safety when accessing shared resources.
* This lock provides more flexibility and capabilities than the intrinsic locking mechanism,
* such as the ability to attempt to acquire a lock with a timeout, or to interrupt a thread
* waiting to acquire a lock.
*
* @see java.util.concurrent.locks.ReentrantLock
*/
private final ReentrantLock lock;

public KendraSearchProvider() {
lock = new ReentrantLock();
}

/**
Expand Down Expand Up @@ -339,14 +351,17 @@ private void doPartialReindex() {
}
LOG.debug( "Indexing updated pages. Please wait ..." );
final String executionId = startExecution();
synchronized ( updates ) {
lock.lock();
try {
try {
while ( updates.size() > 0 ) {
indexOnePage( updates.remove( 0 ), executionId );
}
} finally {
stopExecution();
}
} finally {
lock.unlock();
}
}

Expand Down
49 changes: 41 additions & 8 deletions jspwiki-main/src/main/java/org/apache/wiki/WatchDog.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;


/**
Expand Down Expand Up @@ -57,6 +58,16 @@ public final class WatchDog {
private static final Map< Integer, WeakReference< WatchDog > > c_kennel = new ConcurrentHashMap<>();
private static WikiBackgroundThread c_watcherThread;

/**
* A lock used to ensure thread safety when accessing shared resources.
* This lock provides more flexibility and capabilities than the intrinsic locking mechanism,
* such as the ability to attempt to acquire a lock with a timeout, or to interrupt a thread
* waiting to acquire a lock.
*
* @see java.util.concurrent.locks.ReentrantLock
*/
private final ReentrantLock lock;

/**
* Returns the current watchdog for the current thread. This is the preferred method of getting you a Watchdog, since it
* keeps an internal list of Watchdogs for you so that there won't be more than one watchdog per thread.
Expand Down Expand Up @@ -92,11 +103,16 @@ public WatchDog( final Engine engine, final Watchable watch ) {
m_engine = engine;
m_watchable = watch;

synchronized( WatchDog.class ) {
lock = new ReentrantLock();

lock.lock();
try {
if( c_watcherThread == null ) {
c_watcherThread = new WatchDogThread( engine );
c_watcherThread.start();
}
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -136,25 +152,31 @@ private static void scrub() {
* Can be used to enable the WatchDog. Will cause a new Thread to be created, if none was existing previously.
*/
public void enable() {
synchronized( WatchDog.class ) {
lock.lock();
try {
if( !m_enabled ) {
m_enabled = true;
c_watcherThread = new WatchDogThread( m_engine );
c_watcherThread.start();
}
} finally {
lock.unlock();
}
}

/**
* Is used to disable a WatchDog. The watchdog thread is shut down and resources released.
*/
public void disable() {
synchronized( WatchDog.class ) {
lock.lock();
try {
if( m_enabled ) {
m_enabled = false;
c_watcherThread.shutdown();
c_watcherThread = null;
}
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -186,9 +208,12 @@ public void enterState( final String state ) {
*/
public void enterState( final String state, final int expectedCompletionTime ) {
LOG.debug( "{}: Entering state {}, expected completion in {} s", m_watchable.getName(), state, expectedCompletionTime );
synchronized( m_stateStack ) {
lock.lock();
try {
final State st = new State( state, expectedCompletionTime );
m_stateStack.push( st );
} finally {
lock.unlock();
}
}

Expand All @@ -208,7 +233,8 @@ public void exitState() {
*/
public void exitState( final String state ) {
if( !m_stateStack.empty() ) {
synchronized( m_stateStack ) {
lock.lock();
try {
final State st = m_stateStack.peek();
if( state == null || st.getState().equals( state ) ) {
m_stateStack.pop();
Expand All @@ -218,6 +244,8 @@ public void exitState( final String state ) {
// FIXME: should actually go and fix things for that
LOG.error( "exitState() called before enterState()" );
}
} finally {
lock.unlock();
}
} else {
LOG.warn( "Stack for " + m_watchable.getName() + " is empty!" );
Expand All @@ -244,8 +272,8 @@ public boolean isWatchableAlive() {

private void check() {
LOG.debug( "Checking watchdog '{}'", m_watchable.getName() );

synchronized( m_stateStack ) {
lock.lock();
try {
if( !m_stateStack.empty() ) {
final State st = m_stateStack.peek();
final long now = System.currentTimeMillis();
Expand All @@ -261,6 +289,8 @@ private void check() {
} else {
LOG.warn( "Stack for " + m_watchable.getName() + " is empty!" );
}
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -302,14 +332,17 @@ private void dumpStackTraceForWatchable() {
*/
@Override
public String toString() {
synchronized( m_stateStack ) {
lock.lock();
try {
String state = "Idle";

if( !m_stateStack.empty() ) {
final State st = m_stateStack.peek();
state = st.getState();
}
return "WatchDog state=" + state;
} finally {
lock.unlock();
}
}

Expand Down
Loading

0 comments on commit eb77ea5

Please sign in to comment.