Skip to content

Commit

Permalink
Use custom Synchronizer class to manage ReentrantLock synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
arturobernalg committed Nov 26, 2023
1 parent 5f52d4d commit 659b8a5
Show file tree
Hide file tree
Showing 39 changed files with 826 additions and 964 deletions.
7 changes: 7 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ specific language governing permissions and limitations
under the License.
-->

**2023-11-26 Arturo Bernal (abernal AT apache DOT org)**

* _2.12.2-git-08_
*
* [JSPWIKI-1178](https://issues.apache.org/jira/browse/JSPWIKI-1178) - Deadlock with Java Virtual Threads


**2023-11-25 Juan Pablo Santos (juanpablo AT apache DOT org)**

* _2.12.2-git-09_
Expand Down
2 changes: 1 addition & 1 deletion jspwiki-api/src/main/java/org/apache/wiki/api/Release.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class Release {
* <p>
* If the build identifier is empty, it is not added.
*/
public static final String BUILD = "09";
public static final String BUILD = "10";

/**
* This is the generic version string you should use when printing out the version. It is of
Expand Down
6 changes: 6 additions & 0 deletions jspwiki-event/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jspwiki-util</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wiki.util.Synchronizer;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
Expand All @@ -33,6 +34,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.util.Set;
import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand Down Expand Up @@ -150,7 +152,18 @@ public final class WikiEventManager {
*
* @see java.util.concurrent.locks.ReentrantLock
*/
private static final ReentrantLock lock = new ReentrantLock();
private static final ReentrantLock instanceLock = new ReentrantLock();
private static final ReentrantLock removeWikiEventListenerLock = new ReentrantLock();
private static final ReentrantLock delegatesLockLock = new ReentrantLock();
private static final ReentrantLock preloadCacheLock = new ReentrantLock();
private static final ReentrantLock delegateForLock = new ReentrantLock();
private static final ReentrantLock wikiEventListenersLock = new ReentrantLock();
private static final ReentrantLock wikiEventListenerLock = new ReentrantLock();
private static final ReentrantLock removeWikiEventListenerLock2 = new ReentrantLock();
private static final ReentrantLock isListeningLock = new ReentrantLock();
private static final ReentrantLock fireEventLock = new ReentrantLock();




/** Constructor for a WikiEventManager. */
Expand All @@ -167,15 +180,12 @@ private WikiEventManager() {
*/
public static WikiEventManager getInstance() {
if (c_instance == null) {
lock.lock();
try {
Synchronizer.synchronize(instanceLock, () -> {
if (c_instance == null) {
c_instance = new WikiEventManager();
// start up any post-instantiation services here
}
} finally {
lock.unlock();
}
});
}
return c_instance;
}
Expand Down Expand Up @@ -255,41 +265,28 @@ public static Set<WikiEventListener> getWikiEventListeners( final Object client
* @return true if the listener was found and removed.
*/
public static boolean removeWikiEventListener( final WikiEventListener listener ) {
boolean removed = false;
final AtomicBoolean removed = new AtomicBoolean(false);
// get the Map.entry object for the entire Map, then check match on entry (listener)
final WikiEventManager mgr = getInstance();
final Map< Object, WikiEventDelegate > sources = Collections.synchronizedMap( mgr.getDelegates() );
lock.lock();
try {
Synchronizer.synchronize(removeWikiEventListenerLock, () -> {
// get an iterator over the Map.Enty objects in the map
for( final Map.Entry< Object, WikiEventDelegate > entry : sources.entrySet() ) {
// the entry value is the delegate
final WikiEventDelegate delegate = entry.getValue();

// now see if we can remove the listener from the delegate (delegate may be null because this is a weak reference)
if( delegate != null && delegate.removeWikiEventListener( listener ) ) {
removed = true; // was removed
removed.set(true); // was removed
}
}
} finally {
lock.unlock();
}
return removed;
});
return removed.get();
}

private void removeDelegates() {
lock.lock();
try {
m_delegates.clear();
} finally {
lock.unlock();
}
lock.lock();
try {
m_preloadCache.clear();
} finally {
lock.unlock();
}
Synchronizer.synchronize(delegatesLockLock, m_delegates::clear);
Synchronizer.synchronize(preloadCacheLock, m_preloadCache::clear);
}

public static void shutdown() {
Expand Down Expand Up @@ -340,38 +337,35 @@ private Map< Object, WikiEventDelegate > getDelegates() {
* @param client the client Object, or alternately a Class reference
* @return the WikiEventDelegate.
*/
private WikiEventDelegate getDelegateFor( final Object client ) {
lock.lock();
try {
if( client == null || client instanceof Class ) { // then preload the cache
final WikiEventDelegate delegate = new WikiEventDelegate( client );
m_preloadCache.add( delegate );
m_delegates.put( client, delegate );
private WikiEventDelegate getDelegateFor(final Object client) {
return Synchronizer.synchronize(delegateForLock, () -> {
if (client == null || client instanceof Class) { // then preload the cache
final WikiEventDelegate delegate = new WikiEventDelegate(client);
m_preloadCache.add(delegate);
m_delegates.put(client, delegate);
return delegate;
} else if( !m_preloadCache.isEmpty() ) {
} else if (!m_preloadCache.isEmpty()) {
// then see if any of the cached delegates match the class of the incoming client
for( int i = m_preloadCache.size()-1 ; i >= 0 ; i-- ) { // start with most-recently added
final WikiEventDelegate delegate = m_preloadCache.elementAt( i );
if( delegate.getClientClass() == null || delegate.getClientClass().equals( client.getClass() ) ) {
for (int i = m_preloadCache.size() - 1; i >= 0; i--) { // start with most-recently added
final WikiEventDelegate delegate = m_preloadCache.elementAt(i);
if (delegate.getClientClass() == null || delegate.getClientClass().equals(client.getClass())) {
// we have a hit, so use it, but only on a client we haven't seen before
if( !m_delegates.containsKey( client ) ) {
m_preloadCache.remove( delegate );
m_delegates.put( client, delegate );
if (!m_delegates.containsKey(client)) {
m_preloadCache.remove(delegate);
m_delegates.put(client, delegate);
return delegate;
}
}
}
}
// otherwise treat normally...
WikiEventDelegate delegate = m_delegates.get( client );
if( delegate == null ) {
delegate = new WikiEventDelegate( client );
m_delegates.put( client, delegate );
WikiEventDelegate delegate = m_delegates.get(client);
if (delegate == null) {
delegate = new WikiEventDelegate(client);
m_delegates.put(client, delegate);
}
return delegate;
} finally {
lock.unlock();
}
});
}


Expand Down Expand Up @@ -415,8 +409,7 @@ private static final class WikiEventDelegate {
* @throws java.lang.UnsupportedOperationException if any attempt is made to modify the Set
*/
public Set< WikiEventListener > getWikiEventListeners() {
lock.lock();
try {
return Synchronizer.synchronize(wikiEventListenersLock, () -> {
final TreeSet< WikiEventListener > set = new TreeSet<>( new WikiEventListenerComparator() );
for( final WeakReference< WikiEventListener > wikiEventListenerWeakReference : m_listenerList ) {
final WikiEventListener l = wikiEventListenerWeakReference.get();
Expand All @@ -426,9 +419,7 @@ public Set< WikiEventListener > getWikiEventListeners() {
}

return Collections.unmodifiableSet( set );
} finally {
lock.unlock();
}
});
}

/**
Expand All @@ -438,18 +429,15 @@ public Set< WikiEventListener > getWikiEventListeners() {
* @return true if the listener was added (i.e., it was not already in the list and was added)
*/
public boolean addWikiEventListener( final WikiEventListener listener ) {
lock.lock();
try {
return Synchronizer.synchronize(wikiEventListenerLock, () -> {
final boolean listenerAlreadyContained = m_listenerList.stream()
.map( WeakReference::get )
.anyMatch( ref -> ref == listener );
if( !listenerAlreadyContained ) {
return m_listenerList.add( new WeakReference<>( listener ) );
}
} finally {
lock.unlock();
}
return false;
return false;
});
}

/**
Expand All @@ -459,67 +447,54 @@ public boolean addWikiEventListener( final WikiEventListener listener ) {
* @return true if the listener was removed (i.e., it was actually in the list and was removed)
*/
public boolean removeWikiEventListener( final WikiEventListener listener ) {
lock.lock();
try {
for( final Iterator< WeakReference< WikiEventListener > > i = m_listenerList.iterator(); i.hasNext(); ) {
return Synchronizer.synchronize(removeWikiEventListenerLock2, () -> {
for (final Iterator<WeakReference<WikiEventListener>> i = m_listenerList.iterator(); i.hasNext(); ) {
final WikiEventListener l = i.next().get();
if( l == listener ) {
if (l == listener) {
i.remove();
return true;
}
}
} finally {
lock.unlock();
}

return false;
return false;
});
}

/**
* Returns true if there are one or more listeners registered with this instance.
*/
public boolean isListening() {
lock.lock();
try {
return !m_listenerList.isEmpty();
} finally {
lock.unlock();
}
return Synchronizer.synchronize(isListeningLock, () -> !m_listenerList.isEmpty());
}

/**
* Notify all listeners having a registered interest in change events of the supplied WikiEvent.
*/
public void fireEvent( final WikiEvent event ) {
boolean needsCleanup = false;
public void fireEvent(final WikiEvent event) {
final AtomicBoolean needsCleanup = new AtomicBoolean(false);
try {
lock.lock();
try {
for( final WeakReference< WikiEventListener > wikiEventListenerWeakReference : m_listenerList ) {
Synchronizer.synchronize(fireEventLock, () -> {
for (final WeakReference<WikiEventListener> wikiEventListenerWeakReference : m_listenerList) {
final WikiEventListener listener = wikiEventListenerWeakReference.get();
if( listener != null ) {
listener.actionPerformed( event );
if (listener != null) {
listener.actionPerformed(event);
} else {
needsCleanup = true;
needsCleanup.set(true);
}
}

// Remove all such listeners which have expired
if( needsCleanup ) {
for( int i = 0; i < m_listenerList.size(); i++ ) {
final WeakReference< WikiEventListener > w = m_listenerList.get( i );
if( w.get() == null ) {
// Remove all such listeners which have expired
if (needsCleanup.get()) {
for (int i = 0; i < m_listenerList.size(); i++) {
final WeakReference<WikiEventListener> w = m_listenerList.get(i);
if (w.get() == null) {
m_listenerList.remove(i--);
}
}
}

} finally {
lock.unlock();
}
} catch( final ConcurrentModificationException e ) {
// We don't die, we just don't do notifications in that case.
LOG.info( "Concurrent modification of event list; please report this.", e );
});
} catch (final ConcurrentModificationException e) {
// We don't die, we just don't do notifications in that case.
LOG.info("Concurrent modification of event list; please report this.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.wiki.auth.permissions.PagePermission;
import org.apache.wiki.pages.PageManager;
import org.apache.wiki.search.SearchProvider;
import org.apache.wiki.util.Synchronizer;
import org.apache.wiki.util.TextUtil;

import java.io.IOException;
Expand Down Expand Up @@ -157,7 +158,7 @@ public void pageRemoved( final Page page ) {
final BatchDeleteDocumentRequest request = new BatchDeleteDocumentRequest().withIndexId( indexId )
.withDocumentIdList( pageName );
final BatchDeleteDocumentResult result = getKendra().batchDeleteDocument( request );
if (result.getFailedDocuments().isEmpty()) {
if ( result.getFailedDocuments().size() == 0 ) {
LOG.debug( format( "Page '%s' was removed from index", pageName ) );
} else {
LOG.error( format( "Failed to remove Page '%s' from index", pageName ) );
Expand Down Expand Up @@ -346,23 +347,21 @@ private void doFullReindex() throws IOException {
* index pages that have been modified
*/
private void doPartialReindex() {
if ( updates.isEmpty() ) {
if (updates.isEmpty()) {
return;
}
LOG.debug( "Indexing updated pages. Please wait ..." );
LOG.debug("Indexing updated pages. Please wait ...");
final String executionId = startExecution();
lock.lock();
try {

Synchronizer.synchronize(lock, () -> {
try {
while (!updates.isEmpty()) {
indexOnePage( updates.remove( 0 ), executionId );
indexOnePage(updates.remove(0), executionId);
}
} finally {
stopExecution();
}
} finally {
lock.unlock();
}
});
}

/**
Expand Down Expand Up @@ -399,7 +398,7 @@ private void indexOnePage( final Page page, final String executionId ) {
final BatchPutDocumentRequest request = new BatchPutDocumentRequest().withIndexId( indexId )
.withDocuments( document );
final BatchPutDocumentResult result = getKendra().batchPutDocument( request );
if (result.getFailedDocuments().isEmpty()) {
if ( result.getFailedDocuments().size() == 0 ) {
LOG.info( format( "Successfully indexed Page '%s' as %s", page.getName(), document.getContentType() ) );
} else {
for ( final BatchPutDocumentResponseFailedDocument failedDocument : result.getFailedDocuments() ) {
Expand Down
Loading

0 comments on commit 659b8a5

Please sign in to comment.