Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-11443 - Use a single DefaultIndexWriterFactory in the LuceneIndexEditorProvider #2041

Draft
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class IndexCopier implements CopyOnReadStatsMBean, Closeable {

private final Map<String, String> indexPathVersionMapping = new ConcurrentHashMap<>();
private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = new ConcurrentHashMap<>();
private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final boolean prefetchEnabled;
private volatile boolean closed;
private final IndexRootDirectory indexRootDirectory;
Expand Down Expand Up @@ -528,9 +528,7 @@ public TabularData getIndexPathMapping() {
IndexMappingData.FIELD_NAMES,
new String[]{indexDir.getJcrPath(), indexDir.getFSPath(), size}));
}
} catch (OpenDataException e){
throw new IllegalStateException(e);
} catch (IOException e) {
} catch (OpenDataException | IOException e){
throw new IllegalStateException(e);
}
return tds;
Expand Down Expand Up @@ -608,7 +606,7 @@ public long getLocalIndexDirSize() {
@Override
public String[] getGarbageDetails() {
return toArray(transform(failedToDeleteFiles.values(),
input -> input.deleteLog()), String.class);
LocalIndexFile::deleteLog), String.class);
}

@Override
Expand Down Expand Up @@ -652,7 +650,7 @@ public String getSkippedFromUploadSize() {
@Override
public String[] getCopyInProgressDetails() {
return toArray(transform(copyInProgressFiles,
input -> input.copyLog()), String.class);
LocalIndexFile::copyLog), String.class);
}

@Override
Expand Down Expand Up @@ -706,10 +704,10 @@ public interface COWDirectoryTracker {

COWDirectoryTracker NOOP = new COWDirectoryTracker() {
@Override
public void registerOpenedDirectory(CopyOnWriteDirectory directory) {}
public void registerOpenedDirectory(@NotNull CopyOnWriteDirectory directory) {}

@Override
public void registerReindexingLocalDirectory(File dir) {}
public void registerReindexingLocalDirectory(@NotNull File dir) {}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,17 @@
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.ReadOnlyBuilder;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.lucene.index.IndexableField;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument;
import static java.util.Objects.requireNonNull;
Expand All @@ -68,7 +69,6 @@
*
* @see LuceneIndexEditor
* @see IndexEditorProvider
*
*/
public class LuceneIndexEditorProvider implements IndexEditorProvider {
private final static Logger LOG = LoggerFactory.getLogger(LuceneIndexEditorProvider.class);
Expand All @@ -95,6 +95,10 @@ public class LuceneIndexEditorProvider implements IndexEditorProvider {
private int inMemoryDocsLimit = Integer.getInteger("oak.lucene.inMemoryDocsLimit", 500);
private AsyncIndexesSizeStatsUpdate asyncIndexesSizeStatsUpdate;

private final ReentrantLock defaultWriterFactoryInitLock = new ReentrantLock();
private DefaultIndexWriterFactory defaultIndexWriterFactory;
private COWDirectoryCleanupCallback cowDirectoryCleanupCallback;

public LuceneIndexEditorProvider() {
this(null);
}
Expand Down Expand Up @@ -150,16 +154,16 @@ public LuceneIndexEditorProvider withAsyncIndexesSizeStatsUpdate(AsyncIndexesSiz

@Override
public Editor getIndexEditor(
@NotNull String type, @NotNull NodeBuilder definition, @NotNull NodeState root,
@NotNull IndexUpdateCallback callback)
@NotNull String type, @NotNull NodeBuilder definition, @NotNull NodeState root,
@NotNull IndexUpdateCallback callback)
throws CommitFailedException {
if (TYPE_LUCENE.equals(type)) {
checkArgument(callback instanceof ContextAwareCallback,
"callback instance not of type ContextAwareCallback [%s]", callback);
IndexingContext indexingContext = ((ContextAwareCallback)callback).getIndexingContext();
IndexingContext indexingContext = ((ContextAwareCallback) callback).getIndexingContext();
BlobDeletionCallback blobDeletionCallback = activeDeletedBlobCollector.getBlobDeletionCallback();
indexingContext.registerIndexCommitCallback(blobDeletionCallback);
FulltextIndexWriterFactory writerFactory = null;
FulltextIndexWriterFactory<Iterable<? extends IndexableField>> writerFactory = null;
LuceneIndexDefinition indexDefinition = null;
boolean asyncIndexing = true;
String indexPath = indexingContext.getIndexPath();
Expand All @@ -170,12 +174,12 @@ public Editor getIndexEditor(

//Would not participate in reindexing. Only interested in
//incremental indexing
if (indexingContext.isReindexing()){
if (indexingContext.isReindexing()) {
return null;
}

CommitContext commitContext = getCommitContext(indexingContext);
if (commitContext == null){
if (commitContext == null) {
//Logically there should not be any commit without commit context. But
//some initializer code does the commit with out it. So ignore such calls with
//warning now
Expand All @@ -192,9 +196,9 @@ public Editor getIndexEditor(
//IndexDefinition from tracker might differ from one passed here for reindexing
//case which should be fine. However reusing existing definition would avoid
//creating definition instance for each commit as this gets executed for each commit
if (indexTracker != null){
if (indexTracker != null) {
indexDefinition = indexTracker.getIndexDefinition(indexPath);
if (indexDefinition != null && !indexDefinition.hasMatchingNodeTypeReg(root)){
if (indexDefinition != null && !indexDefinition.hasMatchingNodeTypeReg(root)) {
LOG.debug("Detected change in NodeType registry for index {}. Would not use " +
"existing index definition", indexDefinition.getIndexPath());
indexDefinition = null;
Expand Down Expand Up @@ -226,12 +230,12 @@ public Editor getIndexEditor(
}

if (writerFactory == null) {
COWDirectoryCleanupCallback cowDirectoryCleanupCallback = new COWDirectoryCleanupCallback();
// The default writer factory is generic, it does not depend on the details of the index definition
// We can create it once and reuse it for all indexes
initDefaultWriterFactory();
LOG.info("Registering COWDirectoryCleanupCallback for {}", indexPath);
indexingContext.registerIndexCommitCallback(cowDirectoryCleanupCallback);

writerFactory = new DefaultIndexWriterFactory(mountInfoProvider,
newDirectoryFactory(blobDeletionCallback, cowDirectoryCleanupCallback),
writerConfig);
writerFactory = defaultIndexWriterFactory;
}

LuceneIndexEditorContext context = new LuceneIndexEditorContext(root, definition, indexDefinition, callback,
Expand All @@ -257,6 +261,22 @@ public Editor getIndexEditor(
return null;
}

private void initDefaultWriterFactory() {
defaultWriterFactoryInitLock.lock();
try {
if (defaultIndexWriterFactory == null) {
LOG.info("Initializing DefaultIndexWriterFactory");
cowDirectoryCleanupCallback = new COWDirectoryCleanupCallback();
BlobDeletionCallback blobDeletionCallback = activeDeletedBlobCollector.getBlobDeletionCallback();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is possible to add a test case to show the problem of the current code

defaultIndexWriterFactory = new DefaultIndexWriterFactory(mountInfoProvider,
newDirectoryFactory(blobDeletionCallback, cowDirectoryCleanupCallback),
writerConfig);
}
} finally {
defaultWriterFactoryInitLock.unlock();
}
}

IndexCopier getIndexCopier() {
return indexCopier;
}
Expand All @@ -278,7 +298,7 @@ protected DirectoryFactory newDirectoryFactory(BlobDeletionCallback blobDeletion
return new DefaultDirectoryFactory(indexCopier, blobStore, blobDeletionCallback, cowDirectoryTracker);
}

private LuceneDocumentHolder getDocumentHolder(CommitContext commitContext){
private LuceneDocumentHolder getDocumentHolder(CommitContext commitContext) {
LuceneDocumentHolder holder = (LuceneDocumentHolder) commitContext.get(LuceneDocumentHolder.NAME);
if (holder == null) {
holder = new LuceneDocumentHolder(indexingQueue, inMemoryDocsLimit);
Expand Down Expand Up @@ -315,8 +335,8 @@ private static CommitContext getCommitContext(IndexingContext indexingContext) {
private static class COWDirectoryCleanupCallback implements IndexCommitCallback, COWDirectoryTracker {
private static final Logger LOG = LoggerFactory.getLogger(COWDirectoryCleanupCallback.class);

private final List<CopyOnWriteDirectory> openedCoWDirectories = new ArrayList<>();
private final List<File> reindexingLocalDirectories = new ArrayList<>();
private final CopyOnWriteArrayList<CopyOnWriteDirectory> openedCoWDirectories = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<File> reindexingLocalDirectories = new CopyOnWriteArrayList<>();

@Override
public void commitProgress(IndexProgress indexProgress) {
Expand All @@ -331,7 +351,7 @@ public void commitProgress(IndexProgress indexProgress) {
}

for (File f : reindexingLocalDirectories) {
if ( ! FileUtils.deleteQuietly(f)) {
if (!FileUtils.deleteQuietly(f)) {
LOG.warn("Failed to delete {}", f);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.jackrabbit.oak.plugins.index.lucene.writer;

import org.apache.jackrabbit.oak.commons.properties.SystemPropertySupplier;
import org.apache.lucene.index.IndexWriterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LuceneIndexWriterConfig {
private final static Logger LOG = LoggerFactory.getLogger(LuceneIndexWriterConfig.class);
/**
* This property will be used to set Lucene's IndexWriter.maxBufferedDeleteTerms
* IndexWriter.maxBufferedDeleteTerms is used to flush buffered data to lucene index.
Expand All @@ -34,11 +38,14 @@ public class LuceneIndexWriterConfig {
*/
public final static String RAM_PER_THREAD_HARD_LIMIT_MB_KEY = "oak.index.lucene.ramPerThreadHardLimitMB";

private final int maxBufferedDeleteTerms = SystemPropertySupplier.create(
MAX_BUFFERED_DELETE_TERMS_KEY, IndexWriterConfig.DISABLE_AUTO_FLUSH)
.loggingTo(LOG).get();
private final int ramPerThreadHardLimitMB = SystemPropertySupplier.create(
RAM_PER_THREAD_HARD_LIMIT_MB_KEY, IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB)
.loggingTo(LOG).get();

private final double ramBufferSizeMB;
private final int maxBufferedDeleteTerms = Integer.getInteger(MAX_BUFFERED_DELETE_TERMS_KEY,
IndexWriterConfig.DISABLE_AUTO_FLUSH);
private final int ramPerThreadHardLimitMB = Integer.getInteger(RAM_PER_THREAD_HARD_LIMIT_MB_KEY,
IndexWriterConfig.DEFAULT_RAM_PER_THREAD_HARD_LIMIT_MB);
private final int threadCount;

public LuceneIndexWriterConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,9 @@ public FacetsConfig getFacetsConfig() {
return facetsConfig;
}


@Override
public String toString() {
return definition.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;

Expand Down Expand Up @@ -165,7 +166,7 @@ static Iterable<? extends PropertyState> getProperties(
x -> !keys.contains(x == null ? null : x.getName());
return concat(
filter(base.getProperties(), predicate::test),
filter(properties.values(), x -> x != null));
filter(properties.values(), Objects::nonNull));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems unrelated...

}
}

Expand Down
Loading