Skip to content

Commit

Permalink
use caffeine for iceberg-specific caches
Browse files Browse the repository at this point in the history
  • Loading branch information
ZacBlanco committed Feb 21, 2025
1 parent 9a73101 commit 238dc52
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 39 deletions.
6 changes: 6 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.inject.Binder;
Expand All @@ -97,6 +98,7 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
Expand Down Expand Up @@ -200,7 +202,7 @@ protected void setup(Binder binder)
@Provides
public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBeanExporter exporter)
{
Cache<StatisticsFileCacheKey, ColumnStatistics> delegate = CacheBuilder.newBuilder()
com.github.benmanes.caffeine.cache.Cache<StatisticsFileCacheKey, ColumnStatistics> delegate = Caffeine.newBuilder()
.maximumWeight(config.getMaxStatisticsFileCacheSize().toBytes())
.<StatisticsFileCacheKey, ColumnStatistics>weigher((key, entry) -> (int) entry.getEstimatedSize())
.recordStats()
Expand All @@ -214,14 +216,15 @@ public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBean
@Provides
public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter)
{
Cache<ManifestFileCacheKey, ManifestFileCachedContent> delegate = CacheBuilder.newBuilder()
com.github.benmanes.caffeine.cache.Caffeine<ManifestFileCacheKey, ManifestFileCachedContent> delegate = Caffeine.newBuilder()
.maximumWeight(config.getMaxManifestCacheSize())
.<ManifestFileCacheKey, ManifestFileCachedContent>weigher((key, entry) -> (int) entry.getData().stream().mapToLong(ByteBuffer::capacity).sum())
.expireAfterWrite(Duration.ofMillis(config.getManifestCacheExpireDuration()))
.recordStats()
.build();
.recordStats();
if (config.getManifestCacheExpireDuration() > 0) {
delegate.expireAfterWrite(config.getManifestCacheExpireDuration(), MILLISECONDS);
}
ManifestFileCache manifestFileCache = new ManifestFileCache(
delegate,
delegate.build(),
config.getManifestCachingEnabled(),
config.getManifestCacheMaxContentLength(),
config.getManifestCacheMaxChunkSize().toBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,33 @@
package com.facebook.presto.iceberg;

import com.facebook.airlift.stats.DistributionStat;
import com.facebook.presto.hive.CacheStatsMBean;
import com.google.common.cache.Cache;
import com.google.common.cache.ForwardingCache;
import com.facebook.presto.iceberg.cache.CaffeineCacheStatsMBean;
import com.facebook.presto.iceberg.cache.SimpleForwardingCache;
import com.github.benmanes.caffeine.cache.Cache;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

public class ManifestFileCache
extends ForwardingCache.SimpleForwardingCache<ManifestFileCacheKey, ManifestFileCachedContent>
extends SimpleForwardingCache<ManifestFileCacheKey, ManifestFileCachedContent>
{
private final DistributionStat fileSizes = new DistributionStat();
private final long maxFileLength;
private final boolean enabled;
private final long bufferChunkSize;
private final CacheStatsMBean statsMBean;
private final CaffeineCacheStatsMBean statsMBean;

public ManifestFileCache(Cache<ManifestFileCacheKey, ManifestFileCachedContent> delegate, boolean enabled, long maxFileLength, long bufferChunkSize)
{
super(delegate);
this.maxFileLength = maxFileLength;
this.enabled = enabled;
this.bufferChunkSize = bufferChunkSize;
this.statsMBean = new CacheStatsMBean(delegate);
this.statsMBean = new CaffeineCacheStatsMBean(delegate);
}

@Managed
@Nested
public CacheStatsMBean getCacheStats()
public CaffeineCacheStatsMBean getCacheStats()
{
return statsMBean;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.facebook.presto.iceberg.cache;

import com.github.benmanes.caffeine.cache.Cache;
import org.weakref.jmx.Managed;

import static java.util.Objects.requireNonNull;

public class CaffeineCacheStatsMBean
{
private final Cache<?, ?> cache;

public CaffeineCacheStatsMBean(Cache<?, ?> cache)
{
this.cache = requireNonNull(cache, "cache is null");
}

@Managed
public long getEstimatedSize()
{
return cache.estimatedSize();
}

@Managed
public long getHitCount()
{
return cache.stats().hitCount();
}

@Managed
public long getMissCount()
{
return cache.stats().missCount();
}

@Managed
public double getHitRate()
{
return cache.stats().hitRate();
}

@Managed
public double getMissRate()
{
return cache.stats().missRate();
}

@Managed
public double getEvictionCount()
{
return cache.stats().evictionCount();
}

@Managed
public double getEvictionWeight()
{
return cache.stats().evictionWeight();
}

@Managed
public double getLoadCount()
{
return cache.stats().loadCount();
}

@Managed
public double getRequestCount()
{
return cache.stats().requestCount();
}

@Managed
public double getAverageLoadPenalty()
{
return cache.stats().averageLoadPenalty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.facebook.presto.iceberg.cache;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Policy;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

public class SimpleForwardingCache<K, V> implements Cache<K, V>
{
private final Cache<K, V> delegate;

public SimpleForwardingCache(Cache<K, V> delegate)
{
this.delegate = delegate;
}

@Override
public @Nullable V getIfPresent(@NonNull Object key)
{
return delegate.getIfPresent(key);
}

@Override
public @Nullable V get(@NonNull K key, @NonNull Function<? super K, ? extends V> mappingFunction)
{
return delegate.get(key, mappingFunction);
}

@Override
public @NonNull Map<@NonNull K, @NonNull V> getAllPresent(@NonNull Iterable<@NonNull ?> keys)
{
return delegate.getAllPresent(keys);
}

@Override
public void put(@NonNull K key, @NonNull V value)
{
delegate.put(key, value);
}

@Override
public void putAll(@NonNull Map<? extends @NonNull K, ? extends @NonNull V> map)
{
delegate.putAll(map);
}

@Override
public void invalidate(@NonNull Object key)
{
delegate.invalidate(key);
}

@Override
public void invalidateAll(@NonNull Iterable<@NonNull ?> keys)
{
delegate.invalidateAll(keys);
}

@Override
public void invalidateAll()
{
delegate.invalidateAll();
}

@Override
public @NonNegative long estimatedSize()
{
return delegate.estimatedSize();
}

@Override
public @NonNull CacheStats stats()
{
return delegate.stats();
}

@Override
public @NonNull ConcurrentMap<@NonNull K, @NonNull V> asMap()
{
return delegate.asMap();
}

@Override
public void cleanUp()
{
delegate.cleanUp();
}

@Override
public @NonNull Policy<K, V> policy()
{
return delegate.policy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
package com.facebook.presto.iceberg.statistics;

import com.facebook.airlift.stats.DistributionStat;
import com.facebook.presto.hive.CacheStatsMBean;
import com.facebook.presto.iceberg.cache.CaffeineCacheStatsMBean;
import com.facebook.presto.iceberg.cache.SimpleForwardingCache;
import com.facebook.presto.spi.statistics.ColumnStatistics;
import com.google.common.cache.Cache;
import com.google.common.cache.ForwardingCache.SimpleForwardingCache;
import com.github.benmanes.caffeine.cache.Cache;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

Expand All @@ -26,17 +26,17 @@ public class StatisticsFileCache
{
private final DistributionStat fileSizes = new DistributionStat();
private final DistributionStat columnCounts = new DistributionStat();
private final CacheStatsMBean cacheStats;
private final CaffeineCacheStatsMBean cacheStats;

public StatisticsFileCache(Cache<StatisticsFileCacheKey, ColumnStatistics> delegate)
{
super(delegate);
cacheStats = new CacheStatsMBean(delegate);
cacheStats = new CaffeineCacheStatsMBean(delegate);
}

@Managed
@Nested
public CacheStatsMBean getCacheStats()
public CaffeineCacheStatsMBean getCacheStats()
{
return cacheStats;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Joiner;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheStats;
Expand Down Expand Up @@ -113,17 +114,17 @@ public void testManifestFileCaching()
ManifestFileCache manifestFileCache = metadata.getManifestFileCache();
assertUpdate(session, "INSERT INTO test_manifest_file_cache VALUES 1, 2, 3, 4, 5", 5);
manifestFileCache.invalidateAll();
assertEquals(manifestFileCache.size(), 0);
CacheStats initial = manifestFileCache.stats();
assertEquals(manifestFileCache.estimatedSize(), 0);
com.github.benmanes.caffeine.cache.stats.CacheStats initial = manifestFileCache.stats();
assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i");
CacheStats firstQuery = manifestFileCache.stats();
com.github.benmanes.caffeine.cache.stats.CacheStats firstQuery = manifestFileCache.stats();
assertTrue(firstQuery.minus(initial).missCount() > 0);
assertTrue(manifestFileCache.size() > 0);
assertTrue(manifestFileCache.estimatedSize() > 0);
assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i");
CacheStats secondQuery = manifestFileCache.stats();
com.github.benmanes.caffeine.cache.stats.CacheStats secondQuery = manifestFileCache.stats();
assertEquals(secondQuery.minus(firstQuery).missCount(), 0);
assertTrue(secondQuery.minus(firstQuery).hitCount() > 0);
assertTrue(manifestFileCache.size() > 0);
assertTrue(manifestFileCache.estimatedSize() > 0);

//drop table
assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache");
Expand Down Expand Up @@ -158,15 +159,15 @@ public void testManifestFileCachingDisabled()
IcebergHiveMetadata metadata = (IcebergHiveMetadata) delegate.get(catalogMetadata.getMetadataFor(handle.get().getConnectorId()));
ManifestFileCache manifestFileCache = metadata.getManifestFileCache();
assertFalse(manifestFileCache.isEnabled());
CacheStats initial = manifestFileCache.stats();
com.github.benmanes.caffeine.cache.stats.CacheStats initial = manifestFileCache.stats();
assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i");
CacheStats firstQuery = manifestFileCache.stats();
com.github.benmanes.caffeine.cache.stats.CacheStats firstQuery = manifestFileCache.stats();
assertEquals(firstQuery.minus(initial).hitCount(), 0); //
assertEquals(manifestFileCache.size(), 0);
assertEquals(manifestFileCache.estimatedSize(), 0);
assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i");
CacheStats secondQuery = manifestFileCache.stats();
com.github.benmanes.caffeine.cache.stats.CacheStats secondQuery = manifestFileCache.stats();
assertEquals(secondQuery.minus(firstQuery).hitCount(), 0);
assertEquals(manifestFileCache.size(), 0);
assertEquals(manifestFileCache.estimatedSize(), 0);

//drop table
assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache_disabled");
Expand All @@ -181,7 +182,7 @@ protected Table loadTable(String tableName)
return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(),
getHdfsEnvironment(),
new IcebergHiveTableOperationsConfig(),
new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024 * 1024),
new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024 * 1024),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
SchemaTableName.valueOf("tpch." + tableName));
}
Expand Down
Loading

0 comments on commit 238dc52

Please sign in to comment.