Skip to content

Commit

Permalink
Start Solr indexer on CDX completion #2
Browse files Browse the repository at this point in the history
  • Loading branch information
ato committed Nov 6, 2015
1 parent 004532d commit a407a9c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
13 changes: 13 additions & 0 deletions src/bamboo/task/CdxIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.zip.ZipException;

public class CdxIndexer implements Runnable {
private static final int BATCH_SIZE = 1024;
private final DbPool dbPool;
private final List<Consumer<Long>> warcIndexedListeners = new ArrayList<>();

public CdxIndexer(DbPool dbPool) {
this.dbPool = dbPool;
}

public void onWarcIndexed(Consumer<Long> callback) {
warcIndexedListeners.add(callback);
}

public void run() {
while (true) {
List<Db.Warc> warcs;
Expand Down Expand Up @@ -164,6 +170,13 @@ private void indexWarc(Db.Warc warc) throws IOException {
});
}
System.out.println("Finished CDX indexing " + warc.id + " " + warc.path + " (" + stats.records + " records with " + stats.bytes + " bytes)");
sendWarcIndexedNotification(warc.id);
}

private void sendWarcIndexedNotification(long warcId) {
for (Consumer<Long> listener : warcIndexedListeners) {
listener.accept(warcId);
}
}

void indexWarc(long warcId) throws IOException {
Expand Down
11 changes: 9 additions & 2 deletions src/bamboo/task/Taskmaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@ public class Taskmaster {
public Taskmaster(Config config, DbPool dbPool) {
importer = new Task(new Importer(config, dbPool, this::startIndexing));
tasks.add(importer);
indexers.add(new Task(new CdxIndexer(dbPool)));
indexers.add(new Task(new SolrIndexer(dbPool)));

Task solrIndexerTask = new Task(new SolrIndexer(dbPool));

CdxIndexer cdxIndexer = new CdxIndexer(dbPool);
cdxIndexer.onWarcIndexed(warcId -> solrIndexerTask.start());

indexers.add(new Task(cdxIndexer));
indexers.add(solrIndexerTask);

tasks.addAll(indexers);
}

Expand Down

0 comments on commit a407a9c

Please sign in to comment.