diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index bbf7da4e68cf5..413d1393095d6 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -419,6 +419,12 @@ + + com.github.ben-manes.caffeine + caffeine + 2.9.3 + + org.weakref jmxutils diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java index 2538169fd4934..26a417cb7391d 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergCommonModule.java @@ -83,6 +83,7 @@ import com.facebook.presto.spi.connector.ConnectorSplitManager; import com.facebook.presto.spi.procedure.Procedure; import com.facebook.presto.spi.statistics.ColumnStatistics; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.google.inject.Binder; @@ -97,6 +98,7 @@ import java.time.Duration; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.configuration.ConfigBinder.configBinder; @@ -200,7 +202,7 @@ protected void setup(Binder binder) @Provides public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBeanExporter exporter) { - Cache delegate = CacheBuilder.newBuilder() + com.github.benmanes.caffeine.cache.Cache delegate = Caffeine.newBuilder() .maximumWeight(config.getMaxStatisticsFileCacheSize().toBytes()) .weigher((key, entry) -> (int) entry.getEstimatedSize()) .recordStats() @@ -214,14 +216,15 @@ public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBean @Provides public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter) { - Cache delegate = CacheBuilder.newBuilder() + com.github.benmanes.caffeine.cache.Caffeine delegate = Caffeine.newBuilder() .maximumWeight(config.getMaxManifestCacheSize()) .weigher((key, entry) -> (int) entry.getData().stream().mapToLong(ByteBuffer::capacity).sum()) - .expireAfterWrite(Duration.ofMillis(config.getManifestCacheExpireDuration())) - .recordStats() - .build(); + .recordStats(); + if (config.getManifestCacheExpireDuration() > 0) { + delegate.expireAfterWrite(config.getManifestCacheExpireDuration(), MILLISECONDS); + } ManifestFileCache manifestFileCache = new ManifestFileCache( - delegate, + delegate.build(), config.getManifestCachingEnabled(), config.getManifestCacheMaxContentLength(), config.getManifestCacheMaxChunkSize().toBytes()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java index 130505b51ab38..6de74f4fa84b9 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/ManifestFileCache.java @@ -14,20 +14,20 @@ package com.facebook.presto.iceberg; import com.facebook.airlift.stats.DistributionStat; -import com.facebook.presto.hive.CacheStatsMBean; -import com.google.common.cache.Cache; -import com.google.common.cache.ForwardingCache; +import com.facebook.presto.iceberg.cache.CaffeineCacheStatsMBean; +import com.facebook.presto.iceberg.cache.SimpleForwardingCache; +import com.github.benmanes.caffeine.cache.Cache; import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; public class ManifestFileCache - extends ForwardingCache.SimpleForwardingCache + extends SimpleForwardingCache { private final DistributionStat fileSizes = new DistributionStat(); private final long maxFileLength; private final boolean enabled; private final long bufferChunkSize; - private final CacheStatsMBean statsMBean; + private final CaffeineCacheStatsMBean statsMBean; public ManifestFileCache(Cache delegate, boolean enabled, long maxFileLength, long bufferChunkSize) { @@ -35,12 +35,12 @@ public ManifestFileCache(Cache this.maxFileLength = maxFileLength; this.enabled = enabled; this.bufferChunkSize = bufferChunkSize; - this.statsMBean = new CacheStatsMBean(delegate); + this.statsMBean = new CaffeineCacheStatsMBean(delegate); } @Managed @Nested - public CacheStatsMBean getCacheStats() + public CaffeineCacheStatsMBean getCacheStats() { return statsMBean; } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/CaffeineCacheStatsMBean.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/CaffeineCacheStatsMBean.java new file mode 100644 index 0000000000000..10a00ddcefaee --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/CaffeineCacheStatsMBean.java @@ -0,0 +1,76 @@ +package com.facebook.presto.iceberg.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import org.weakref.jmx.Managed; + +import static java.util.Objects.requireNonNull; + +public class CaffeineCacheStatsMBean +{ + private final Cache cache; + + public CaffeineCacheStatsMBean(Cache cache) + { + this.cache = requireNonNull(cache, "cache is null"); + } + + @Managed + public long getEstimatedSize() + { + return cache.estimatedSize(); + } + + @Managed + public long getHitCount() + { + return cache.stats().hitCount(); + } + + @Managed + public long getMissCount() + { + return cache.stats().missCount(); + } + + @Managed + public double getHitRate() + { + return cache.stats().hitRate(); + } + + @Managed + public double getMissRate() + { + return cache.stats().missRate(); + } + + @Managed + public double getEvictionCount() + { + return cache.stats().evictionCount(); + } + + @Managed + public double getEvictionWeight() + { + return cache.stats().evictionWeight(); + } + + @Managed + public double getLoadCount() + { + return cache.stats().loadCount(); + } + + @Managed + public double getRequestCount() + { + return cache.stats().requestCount(); + } + + @Managed + public double getAverageLoadPenalty() + { + return cache.stats().averageLoadPenalty(); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/SimpleForwardingCache.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/SimpleForwardingCache.java new file mode 100644 index 0000000000000..1925bf6b0a9ba --- /dev/null +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/cache/SimpleForwardingCache.java @@ -0,0 +1,100 @@ +package com.facebook.presto.iceberg.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Policy; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import org.checkerframework.checker.index.qual.NonNegative; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; + +public class SimpleForwardingCache implements Cache +{ + private final Cache delegate; + + public SimpleForwardingCache(Cache delegate) + { + this.delegate = delegate; + } + + @Override + public @Nullable V getIfPresent(@NonNull Object key) + { + return delegate.getIfPresent(key); + } + + @Override + public @Nullable V get(@NonNull K key, @NonNull Function mappingFunction) + { + return delegate.get(key, mappingFunction); + } + + @Override + public @NonNull Map<@NonNull K, @NonNull V> getAllPresent(@NonNull Iterable<@NonNull ?> keys) + { + return delegate.getAllPresent(keys); + } + + @Override + public void put(@NonNull K key, @NonNull V value) + { + delegate.put(key, value); + } + + @Override + public void putAll(@NonNull Map map) + { + delegate.putAll(map); + } + + @Override + public void invalidate(@NonNull Object key) + { + delegate.invalidate(key); + } + + @Override + public void invalidateAll(@NonNull Iterable<@NonNull ?> keys) + { + delegate.invalidateAll(keys); + } + + @Override + public void invalidateAll() + { + delegate.invalidateAll(); + } + + @Override + public @NonNegative long estimatedSize() + { + return delegate.estimatedSize(); + } + + @Override + public @NonNull CacheStats stats() + { + return delegate.stats(); + } + + @Override + public @NonNull ConcurrentMap<@NonNull K, @NonNull V> asMap() + { + return delegate.asMap(); + } + + @Override + public void cleanUp() + { + delegate.cleanUp(); + } + + @Override + public @NonNull Policy policy() + { + return delegate.policy(); + } +} diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/StatisticsFileCache.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/StatisticsFileCache.java index 28bf787a0257a..8e4bec899995a 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/StatisticsFileCache.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/statistics/StatisticsFileCache.java @@ -14,10 +14,10 @@ package com.facebook.presto.iceberg.statistics; import com.facebook.airlift.stats.DistributionStat; -import com.facebook.presto.hive.CacheStatsMBean; +import com.facebook.presto.iceberg.cache.CaffeineCacheStatsMBean; +import com.facebook.presto.iceberg.cache.SimpleForwardingCache; import com.facebook.presto.spi.statistics.ColumnStatistics; -import com.google.common.cache.Cache; -import com.google.common.cache.ForwardingCache.SimpleForwardingCache; +import com.github.benmanes.caffeine.cache.Cache; import org.weakref.jmx.Managed; import org.weakref.jmx.Nested; @@ -26,17 +26,17 @@ public class StatisticsFileCache { private final DistributionStat fileSizes = new DistributionStat(); private final DistributionStat columnCounts = new DistributionStat(); - private final CacheStatsMBean cacheStats; + private final CaffeineCacheStatsMBean cacheStats; public StatisticsFileCache(Cache delegate) { super(delegate); - cacheStats = new CacheStatsMBean(delegate); + cacheStats = new CaffeineCacheStatsMBean(delegate); } @Managed @Nested - public CacheStatsMBean getCacheStats() + public CaffeineCacheStatsMBean getCacheStats() { return cacheStats; } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 1cc9d2876e983..397e07b6c8e11 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -52,9 +52,9 @@ import com.facebook.presto.testing.MaterializedRow; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.github.benmanes.caffeine.cache.stats.CacheStats; import com.google.common.base.Joiner; import com.google.common.base.Strings; -import com.google.common.cache.CacheStats; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java index 06a73559bab27..f22151a97efbf 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java @@ -28,6 +28,7 @@ import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.base.Joiner; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheStats; @@ -113,17 +114,17 @@ public void testManifestFileCaching() ManifestFileCache manifestFileCache = metadata.getManifestFileCache(); assertUpdate(session, "INSERT INTO test_manifest_file_cache VALUES 1, 2, 3, 4, 5", 5); manifestFileCache.invalidateAll(); - assertEquals(manifestFileCache.size(), 0); - CacheStats initial = manifestFileCache.stats(); + assertEquals(manifestFileCache.estimatedSize(), 0); + com.github.benmanes.caffeine.cache.stats.CacheStats initial = manifestFileCache.stats(); assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i"); - CacheStats firstQuery = manifestFileCache.stats(); + com.github.benmanes.caffeine.cache.stats.CacheStats firstQuery = manifestFileCache.stats(); assertTrue(firstQuery.minus(initial).missCount() > 0); - assertTrue(manifestFileCache.size() > 0); + assertTrue(manifestFileCache.estimatedSize() > 0); assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i"); - CacheStats secondQuery = manifestFileCache.stats(); + com.github.benmanes.caffeine.cache.stats.CacheStats secondQuery = manifestFileCache.stats(); assertEquals(secondQuery.minus(firstQuery).missCount(), 0); assertTrue(secondQuery.minus(firstQuery).hitCount() > 0); - assertTrue(manifestFileCache.size() > 0); + assertTrue(manifestFileCache.estimatedSize() > 0); //drop table assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache"); @@ -158,15 +159,15 @@ public void testManifestFileCachingDisabled() IcebergHiveMetadata metadata = (IcebergHiveMetadata) delegate.get(catalogMetadata.getMetadataFor(handle.get().getConnectorId())); ManifestFileCache manifestFileCache = metadata.getManifestFileCache(); assertFalse(manifestFileCache.isEnabled()); - CacheStats initial = manifestFileCache.stats(); + com.github.benmanes.caffeine.cache.stats.CacheStats initial = manifestFileCache.stats(); assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i"); - CacheStats firstQuery = manifestFileCache.stats(); + com.github.benmanes.caffeine.cache.stats.CacheStats firstQuery = manifestFileCache.stats(); assertEquals(firstQuery.minus(initial).hitCount(), 0); // - assertEquals(manifestFileCache.size(), 0); + assertEquals(manifestFileCache.estimatedSize(), 0); assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i"); - CacheStats secondQuery = manifestFileCache.stats(); + com.github.benmanes.caffeine.cache.stats.CacheStats secondQuery = manifestFileCache.stats(); assertEquals(secondQuery.minus(firstQuery).hitCount(), 0); - assertEquals(manifestFileCache.size(), 0); + assertEquals(manifestFileCache.estimatedSize(), 0); //drop table assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache_disabled"); @@ -181,7 +182,7 @@ protected Table loadTable(String tableName) return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(), getHdfsEnvironment(), new IcebergHiveTableOperationsConfig(), - new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024 * 1024), + new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024 * 1024), getQueryRunner().getDefaultSession().toConnectorSession(connectorId), SchemaTableName.valueOf("tpch." + tableName)); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java index c473c8f5536ca..094941bb10347 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergHiveStatistics.java @@ -55,6 +55,7 @@ import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -583,7 +584,7 @@ private Table loadTable(String tableName) return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(), getHdfsEnvironment(), new IcebergHiveTableOperationsConfig(), - new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024), + new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024), getQueryRunner().getDefaultSession().toConnectorSession(connectorId), SchemaTableName.valueOf("tpch." + tableName)); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java index 03efa8609ffa6..fddfc2e68bf54 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java @@ -29,6 +29,7 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.tests.DistributedQueryRunner; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableSet; import org.apache.iceberg.Table; @@ -81,7 +82,7 @@ protected Table getIcebergTable(ConnectorSession session, String schema, String return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(), getHdfsEnvironment(), new IcebergHiveTableOperationsConfig(), - new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024), + new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024), session, SchemaTableName.valueOf(schema + "." + tableName)); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java index 33d0ac12d5e8e..3093eb08f9fb2 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestRenameTableOnFragileFileSystem.java @@ -70,7 +70,7 @@ import com.facebook.presto.sql.relational.RowExpressionDomainTranslator; import com.facebook.presto.sql.relational.RowExpressionOptimizer; import com.facebook.presto.testing.TestingConnectorSession; -import com.google.common.cache.CacheBuilder; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -413,8 +413,8 @@ private ConnectorMetadata getIcebergHiveMetadata(ExtendedHiveMetastore metastore new NodeVersion("test_node_v1"), FILTER_STATS_CALCULATOR_SERVICE, new IcebergHiveTableOperationsConfig(), - new StatisticsFileCache(CacheBuilder.newBuilder().build()), - new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024)); + new StatisticsFileCache(Caffeine.newBuilder().build()), + new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024)); return icebergHiveMetadataFactory.create(); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java index 625fbcd30514e..53d311e2d64fa 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/procedure/TestRemoveOrphanFilesProcedureHive.java @@ -27,6 +27,7 @@ import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.SchemaTableName; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -71,7 +72,7 @@ Table createTable(String tableName, String targetPath, Map table getHdfsEnvironment(), hdfsContext, new IcebergHiveTableOperationsConfig(), - new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024), + new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024), "tpch", tableName, session.getUser(), @@ -94,7 +95,7 @@ Table loadTable(String tableName) return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(), getHdfsEnvironment(), new IcebergHiveTableOperationsConfig(), - new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024), + new ManifestFileCache(Caffeine.newBuilder().build(), false, 0, 1024), getQueryRunner().getDefaultSession().toConnectorSession(connectorId), SchemaTableName.valueOf("tpch." + tableName)); }