Skip to content

Commit

Permalink
Greatly improve the performance and concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
octylFractal committed Jul 17, 2024
1 parent 97830f3 commit 3a7e958
Show file tree
Hide file tree
Showing 13 changed files with 410 additions and 281 deletions.
19 changes: 14 additions & 5 deletions app/src/main/java/org/enginehub/cassettedeck/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,36 @@

package org.enginehub.cassettedeck;

import com.google.common.net.HttpHeaders;
import org.enginehub.cassettedeck.data.blob.BlobStorage;
import org.enginehub.cassettedeck.data.blob.DiskStorage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.http.HttpHeaders;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.client.RestTemplate;

import java.net.http.HttpClient;
import java.nio.file.Path;
import java.time.Duration;

@Configuration
@PropertySource("classpath:application.properties")
@EnableScheduling
public class AppConfig {
@Bean
public HttpClient httpClient() {
return HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(10))
.build();
}

@Bean
public RestTemplate restTemplate(RestTemplateBuilder restTemplateBuilder) {
return restTemplateBuilder
.defaultHeader(HttpHeaders.USER_AGENT, "cassette-deck")
.defaultHeader(HttpHeaders.USER_AGENT, CassetteDeck.USER_AGENT)
.build();
}

Expand All @@ -50,14 +59,14 @@ public DiskStorage libraryBlobStorage(
}

@Bean("blockStateData")
public BlobStorage blockStateDataBlobStorage(
public DiskStorage blockStateDataBlobStorage(
@Value("${disk.block-state-data.storage-dir}") Path storageDir
) {
return new DiskStorage(storageDir);
}

@Bean("worldEditCliData")
public BlobStorage worldEditCliDataBlobStorage(
public DiskStorage worldEditCliDataBlobStorage(
@Value("${disk.worldedit-cli-data.storage-dir}") Path storageDir
) {
return new DiskStorage(storageDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
}
)
public class CassetteDeck {
public static final String USER_AGENT = "cassette-deck";

public static void main(String[] args) {
System.setProperty("org.jooq.no-logo", "true");
System.setProperty("org.jooq.no-tips", "true");
Expand Down

This file was deleted.

208 changes: 173 additions & 35 deletions app/src/main/java/org/enginehub/cassettedeck/data/blob/DiskStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,62 @@

package org.enginehub.cassettedeck.data.blob;

import com.google.common.util.concurrent.Striped;
import org.apache.commons.io.function.IOConsumer;
import org.apache.commons.io.function.IOFunction;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.springframework.scheduling.annotation.Scheduled;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Stream;

public class DiskStorage {

private static final Duration EXPIRATION = Duration.ofDays(30);
private static final Logger LOGGER = LogManager.getLogger();

private static Path writeToTempFile(Path ourKey, IOConsumer<Path> consumer) throws IOException {
Path tempFile = Files.createTempFile(ourKey.getParent(), ourKey.getFileName().toString(), ".tmp");
tempFile.toFile().deleteOnExit();
try {
consumer.accept(tempFile);
} catch (Throwable t) {
try {
Files.delete(tempFile);
} catch (IOException e) {
LOGGER.warn("Failed to delete temp file: {}", tempFile, e);
}
throw t;
}
return tempFile;
}

private static void touchKey(Path path) {
try {
Files.setLastModifiedTime(path, FileTime.from(Instant.now()));
} catch (IOException e) {
throw new UncheckedIOException("Failed to update last modified time for " + path, e);
}
}

public class DiskStorage implements BlobStorage {
// NB: This class assumes only a single process is running, and therefore only uses an in-process lock.
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Striped<ReadWriteLock> locks = Striped.readWriteLock(32);
private final Path storageDir;

public DiskStorage(Path storageDir) {
Expand All @@ -44,8 +85,42 @@ public DiskStorage(Path storageDir) {
}
}

@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.DAYS)
public void cleanUpOldEntries() {
Instant keepAfter = Instant.now().minus(EXPIRATION);
try (Stream<Path> walk = Files.walk(storageDir)) {
walk
.filter(Files::isRegularFile)
.forEach(path -> {
Lock lock = locks.get(path).readLock();
lock.lock();
try {
FileTime lastModifiedTime;
try {
lastModifiedTime = Files.getLastModifiedTime(path);
} catch (IOException e) {
LOGGER.warn("Failed to get last modified time for file: {}", path, e);
return;
}
if (lastModifiedTime.toInstant().isAfter(keepAfter)) {
return;
}
try {
Files.delete(path);
} catch (IOException e) {
LOGGER.warn("Failed to delete old file: {}", path, e);
}
} finally {
lock.unlock();
}
});
} catch (IOException e) {
LOGGER.warn("Failed to clean up old entries", e);
}
}

private Path ourKey(String key) {
Path ourKey = storageDir.resolve(key);
Path ourKey = storageDir.resolve(key).toAbsolutePath();
if (ourKey.equals(storageDir)) {
throw new IllegalArgumentException("Key path is the storage directory: " + key);
}
Expand All @@ -55,74 +130,137 @@ private Path ourKey(String key) {
return ourKey;
}

public @Nullable Path retrievePath(String key) {
var path = ourKey(key);
if (!Files.exists(path)) {
return null;
public <R extends @Nullable Object> R usePath(String key, IOFunction<Path, R> consumer) throws IOException {
Path ourKey = ourKey(key);
Lock lock = locks.get(ourKey).readLock();
lock.lock();
try {
if (!Files.isRegularFile(ourKey)) {
throw new IOException("No such file: " + ourKey);
}
touchKey(ourKey);
return consumer.apply(ourKey);
} finally {
lock.unlock();
}
}

public <R extends @Nullable Object> R usePaths(List<String> key, IOFunction<List<Path>, R> consumer) throws IOException {
List<Path> ourKeys = key.stream().map(this::ourKey).toList();
List<Lock> readLocks = new ArrayList<>(ourKeys.size());
for (ReadWriteLock lock : locks.bulkGet(ourKeys)) {
readLocks.add(lock.readLock());
}
for (int i = 0; i < ourKeys.size(); i++) {
try {
readLocks.get(i).lock();
} catch (Throwable t) {
// This is really paranoid, but we don't want to leave any locks held
// Even this isn't exactly perfect, if unlock throws, we're in trouble. But that should never happen.
for (int j = i - 1; j >= 0; j--) {
readLocks.get(j).unlock();
}
throw t;
}
}
try {
for (Path ourKey : ourKeys) {
if (!Files.isRegularFile(ourKey)) {
throw new IOException("No such file: " + ourKey);
}
touchKey(ourKey);
}
return consumer.apply(ourKeys);
} finally {
for (Lock lock : readLocks) {
lock.unlock();
}
}
return path;
}

@Override
public InputStream retrieve(String key) throws IOException {
// This API is Linux-specific in design, but we don't care about Windows.
// Specifically, we rely on atomic moves and the ability to delete open files without issue.
public @Nullable InputStream retrieve(String key) throws IOException {
Path ourKey = ourKey(key);
lock.readLock().lock();
Lock lock = locks.get(ourKey).readLock();
lock.lock();
try {
if (Files.isRegularFile(ourKey)) {
touchKey(ourKey);
// racy, but we _should_ be the sole owner of the storage
// anyone cleaning our files can suffer
return Files.newInputStream(ourKey);
}
return null;
} finally {
lock.readLock().unlock();
lock.unlock();
}
return null;
}

@Override
public void store(String key, OutputStreamConsumer consumer) throws IOException {
public void store(String key, IOConsumer<Path> consumer) throws IOException {
Path ourKey = ourKey(key);
Files.createDirectories(ourKey.getParent());
lock.writeLock().lock();
Lock lock = locks.get(ourKey).writeLock();
lock.lock();
try {
try (var output = Files.newOutputStream(ourKey)) {
consumer.accept(output);
Path tempFile = writeToTempFile(ourKey, consumer);
try {
Files.move(tempFile, ourKey, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
} catch (Throwable t) {
try {
Files.delete(tempFile);
} catch (IOException e) {
LOGGER.warn("Failed to delete temp file: {}", tempFile, e);
}
throw t;
}
} finally {
lock.writeLock().unlock();
lock.unlock();
}
}

@Override
public InputStream storeIfAbsent(String key, OutputStreamConsumer consumer) throws IOException {
/**
* If there is no blob for the given key, use {@code consumer} to fill it, then return a stream to get the contents
* of the blob.
*
* @param key the key
* @param consumer the blob provider
* @return the content of the blob
* @throws IOException if there is an I/O error
*/
public InputStream storeIfAbsent(String key, IOConsumer<Path> consumer) throws IOException {
Path ourKey = ourKey(key);
while (true) {
lock.readLock().lock();
Lock lock = locks.get(ourKey).writeLock();
lock.lock();
try {
if (Files.isRegularFile(ourKey)) {
// racy, but we _should_ be the sole owner of the storage
// anyone cleaning our files can suffer
return Files.newInputStream(ourKey);
}
tryStore(ourKey, consumer);
} finally {
lock.readLock().unlock();
lock.unlock();
}
tryStore(ourKey, consumer);
}
}

private void tryStore(Path ourKey, OutputStreamConsumer consumer) throws IOException {
private void tryStore(Path ourKey, IOConsumer<Path> consumer) throws IOException {
Files.createDirectories(ourKey.getParent());
lock.writeLock().lock();
Path tempFile = writeToTempFile(ourKey, consumer);
try {
// Atomically open the file for writing, failing if it's already there
try (var output = Files.newOutputStream(ourKey, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
consumer.accept(output);
} catch (FileAlreadyExistsException ignored) {
// We'll catch the new file on our next go around.
Files.move(tempFile, ourKey, StandardCopyOption.ATOMIC_MOVE);
} catch (FileAlreadyExistsException e) {
// someone else beat us to it
Files.delete(tempFile);
} catch (Throwable t) {
try {
Files.delete(tempFile);
} catch (IOException e) {
LOGGER.warn("Failed to delete temp file: {}", tempFile, e);
}
} finally {
lock.writeLock().unlock();
throw t;
}
}
}
Loading

0 comments on commit 3a7e958

Please sign in to comment.