From 5b03c0cbd51fbfd9ce77af4fa50fd723f42068e9 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 24 Jan 2025 01:13:43 -0800 Subject: [PATCH] Use ThreadLocalRandom instead of Random in OutputObjectAndByteCounter to reduce contention (#33737) --- .../worker/OutputObjectAndByteCounter.java | 13 +++++++------ .../worker/OutputObjectAndByteCounterTest.java | 16 +++++++++++----- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounter.java index 9aa28b393ece..6c3be8c6f9e9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounter.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounter.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.util.common.worker; import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.runners.core.ElementByteSizeObservable; import org.apache.beam.runners.dataflow.worker.counters.Counter; import org.apache.beam.runners.dataflow.worker.counters.CounterBackedElementByteSizeObserver; @@ -25,6 +26,7 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterMean; import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.NameContext; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.checkerframework.checker.nullness.qual.Nullable; /** An {@link ElementCounter} that counts output objects, bytes, and mean bytes. */ @@ -32,13 +34,12 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class OutputObjectAndByteCounter implements ElementCounter { + // Might be null, e.g., undeclared outputs will not have an // elementByteSizeObservable. private final ElementByteSizeObservable elementByteSizeObservable; private final CounterFactory counterFactory; - private Random randomGenerator = new Random(); - // Lowest sampling probability: 0.001%. private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000; private static final int SAMPLING_CUTOFF = 10; @@ -163,12 +164,12 @@ protected boolean sampleElement() { // samplingCutoff / samplingTokenUpperBound. This algorithm may be refined // later. samplingToken = Math.min(samplingToken + 1, samplingTokenUpperBound); - return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF; + return getRandom().nextInt(samplingToken) < SAMPLING_CUTOFF; } - public OutputObjectAndByteCounter setRandom(Random random) { - this.randomGenerator = random; - return this; + @VisibleForTesting + protected Random getRandom() { + return ThreadLocalRandom.current(); } private CounterName getCounterName(String name) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounterTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounterTest.java index 67370bdea86b..a2add528ffdd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounterTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/OutputObjectAndByteCounterTest.java @@ -92,13 +92,19 @@ public void testAddingCountersIntoCounterSet() throws Exception { } private OutputObjectAndByteCounter makeCounter(String name, int samplingPeriod, int seed) { - return new OutputObjectAndByteCounter( + OutputObjectAndByteCounter outputObjectAndByteCounter = + new OutputObjectAndByteCounter( new ElementByteSizeObservableCoder<>(StringUtf8Coder.of()), counterSet, - NameContextsForTests.nameContextForTest()) - .setSamplingPeriod(samplingPeriod) - .setRandom(new Random(seed)) - .countBytes(name); + NameContextsForTests.nameContextForTest()) { + private final Random random = new Random(seed); + + @Override + protected Random getRandom() { + return random; + } + }; + return outputObjectAndByteCounter.setSamplingPeriod(samplingPeriod).countBytes(name); } @Test