Skip to content

Commit

Permalink
Increase unit test coverage for AbstractMetrics (#13991)
Browse files Browse the repository at this point in the history
  • Loading branch information
spanasch authored Sep 18, 2024
1 parent 2dd7cca commit 956b7cf
Show file tree
Hide file tree
Showing 2 changed files with 330 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.apache.pinot.common.metrics;

import com.yammer.metrics.core.MetricName;
import java.util.concurrent.TimeUnit;
import java.util.function.IntConsumer;
import java.util.stream.IntStream;
import org.apache.pinot.plugin.metrics.yammer.YammerMetricsRegistry;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.metrics.PinotMeter;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -108,4 +112,160 @@ public void testMultipleGauges() {
controllerMetrics.removeGauge(metricName2);
Assert.assertTrue(controllerMetrics.getMetricsRegistry().allMetrics().isEmpty());
}

/**
* Creates and initializes a concrete instance of {@link AbstractMetrics} (in this case, a {@code ControllerMetrics}).
* @return a {@code ControllerMetrics} suitable for testing {@code AbstractMetrics} APIs
*/
private static ControllerMetrics buildTestMetrics() {
PinotConfiguration pinotConfiguration = new PinotConfiguration();
pinotConfiguration.setProperty(CONFIG_OF_METRICS_FACTORY_CLASS_NAME,
"org.apache.pinot.plugin.metrics.yammer.YammerMetricsFactory");
PinotMetricUtils.init(pinotConfiguration);
return new ControllerMetrics(new YammerMetricsRegistry());
}

/**
* Tests the {@link AbstractMetrics} APIs relating to query phases
*/
@Test
public void testQueryPhases() {
final ControllerMetrics testMetrics = buildTestMetrics();
final MetricsInspector inspector = new MetricsInspector(testMetrics.getMetricsRegistry());

// Establish dummy values to be used in the test
final AbstractMetrics.QueryPhase testPhase = () -> "testPhase";
Assert.assertEquals(testPhase.getDescription(), "");
Assert.assertEquals(testPhase.getQueryPhaseName(), "testPhase");
final String testTableName = "tbl_testQueryPhases";
final String testTableName2 = "tbl2_testQueryPhases";

// Add a phase timing, check for correctness
testMetrics.addPhaseTiming(testTableName, testPhase, 1, TimeUnit.SECONDS);
final MetricName tbl1Metric = inspector.lastMetric();
Assert.assertEquals(inspector.getTimer(tbl1Metric).sum(), 1000);

// Add to the existing timer, using different API
testMetrics.addPhaseTiming(testTableName, testPhase, 444000000 /* nanoseconds */);
Assert.assertEquals(inspector.getTimer(tbl1Metric).sum(), 1444);

// Add phase timing to a different table. Verify new timer is set up correctly, old timer is not affected
testMetrics.addPhaseTiming(testTableName2, testPhase, 22, TimeUnit.MILLISECONDS);
final MetricName tbl2Metric = inspector.lastMetric();
Assert.assertEquals(inspector.getTimer(tbl2Metric).sum(), 22);
Assert.assertEquals(inspector.getTimer(tbl1Metric).sum(), 1444);

// Remove both timers. Verify the metrics registry is now empty
testMetrics.removePhaseTiming(testTableName, testPhase);
testMetrics.removePhaseTiming(testTableName2, testPhase);
Assert.assertTrue(testMetrics.getMetricsRegistry().allMetrics().isEmpty());
}

/**
* Tests the {@link AbstractMetrics} APIs relating to timer metrics
*/
@Test
public void testTimerMetrics() {
ControllerMetrics testMetrics = buildTestMetrics();
MetricsInspector inspector = new MetricsInspector(testMetrics.getMetricsRegistry());
String tableName = "tbl_testTimerMetrics";
String keyName = "keyName";
ControllerTimer timer = ControllerTimer.IDEAL_STATE_UPDATE_TIME_MS;

// Test timed table APIs
testMetrics.addTimedTableValue(tableName, timer, 6, TimeUnit.SECONDS);
final MetricName t1Metric = inspector.lastMetric();
Assert.assertEquals(inspector.getTimer(t1Metric).sum(), 6000);
testMetrics.addTimedTableValue(tableName, keyName, timer, 500, TimeUnit.MILLISECONDS);
final MetricName t2Metric = inspector.lastMetric();
Assert.assertEquals(inspector.getTimer(t2Metric).sum(), 500);

// Test timed value APIs
testMetrics.addTimedValue(timer, 40, TimeUnit.MILLISECONDS);
final MetricName t3Metric = inspector.lastMetric();
Assert.assertEquals(inspector.getTimer(t3Metric).sum(), 40);
testMetrics.addTimedValue(keyName, timer, 3, TimeUnit.MILLISECONDS);
final MetricName t4Metric = inspector.lastMetric();
Assert.assertEquals(inspector.getTimer(t4Metric).sum(), 3);

// Remove added timers and verify the metrics registry is now empty
Assert.assertEquals(testMetrics.getMetricsRegistry().allMetrics().size(), 4);
testMetrics.removeTableTimer(tableName, timer);
testMetrics.removeTimer(t2Metric.getName());
testMetrics.removeTimer(t3Metric.getName());
testMetrics.removeTimer(t4Metric.getName());
Assert.assertTrue(testMetrics.getMetricsRegistry().allMetrics().isEmpty());
}

/**
* Tests the {@link AbstractMetrics} APIs relating to metered metrics
*/
@Test
public void testMeteredMetrics() {
final ControllerMetrics testMetrics = buildTestMetrics();
final MetricsInspector inspector = new MetricsInspector(testMetrics.getMetricsRegistry());
final String tableName = "tbl_testMeteredMetrics";
final String keyName = "keyName";
final ControllerMeter meter = ControllerMeter.CONTROLLER_INSTANCE_POST_ERROR;
final ControllerMeter meter2 = ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR;

// Holder for the most recently seen Metric
final MetricName[] currentMetric = new MetricName[1];

// When a new metric is expected we'll call this lambda to assert the metric was created, and update currentMetric
final Runnable expectNewMetric = () -> {
Assert.assertNotEquals(inspector.lastMetric(), currentMetric[0]);
currentMetric[0] = inspector.lastMetric();
};

// Lambda to verify that the latest metric has the expected value, and its creation was expected
final IntConsumer expectMeteredCount = expected -> {
Assert.assertEquals(inspector.getMetered(currentMetric[0]).count(), expected);
Assert.assertEquals(currentMetric[0], inspector.lastMetric());
};

// Test global meter APIs
testMetrics.addMeteredGlobalValue(meter, 5);
expectNewMetric.run();
expectMeteredCount.accept(5);
testMetrics.addMeteredGlobalValue(meter, 4, testMetrics.getMeteredValue(meter));
expectMeteredCount.accept(9);

// Test meter with key APIs
testMetrics.addMeteredValue(keyName, meter, 9);
expectNewMetric.run();
expectMeteredCount.accept(9);
PinotMeter reusedMeter = testMetrics.addMeteredValue(keyName, meter2, 13, null);
expectNewMetric.run();
expectMeteredCount.accept(13);
testMetrics.addMeteredValue(keyName, meter2, 6, reusedMeter);
expectMeteredCount.accept(19);

// Test table-level meter APIs
testMetrics.addMeteredTableValue(tableName, meter, 15);
expectNewMetric.run();
expectMeteredCount.accept(15);
testMetrics.addMeteredTableValue(tableName, meter2, 3, testMetrics.getMeteredTableValue(tableName, meter));
expectMeteredCount.accept(18);

// Test table-level meter with additional key APIs
testMetrics.addMeteredTableValue(tableName, keyName, meter, 21);
expectNewMetric.run();
expectMeteredCount.accept(21);
reusedMeter = testMetrics.addMeteredTableValue(tableName, keyName, meter2, 23, null);
expectNewMetric.run();
expectMeteredCount.accept(23);
testMetrics.addMeteredTableValue(tableName, keyName, meter2, 5, reusedMeter);
expectMeteredCount.accept(28);

// Test removal APIs
Assert.assertEquals(testMetrics.getMetricsRegistry().allMetrics().size(), 6);
// This is the only AbstractMetrics method for removing Meter-type metrics. Should others be added?
testMetrics.removeTableMeter(tableName, meter);
Assert.assertEquals(testMetrics.getMetricsRegistry().allMetrics().size(), 5);
// If we do add other cleanup APIs to AbstractMetrics, they should be tested here. For now, clean the remaining
// metrics with generic APIs.
testMetrics.getMetricsRegistry().allMetrics().keySet().forEach(testMetrics.getMetricsRegistry()::removeMetric);
Assert.assertTrue(testMetrics.getMetricsRegistry().allMetrics().isEmpty());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.metrics;

import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Metered;
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricProcessor;
import com.yammer.metrics.core.MetricsRegistryListener;
import com.yammer.metrics.core.Timer;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;


/**
* A helper for accessing metric values, agnostic of the concrete instance type of the metric. General usage is as
* follows: <br>
* <ul>
* <li>
* Construct a {@code MetricsInspector} from a {@code PinotMetricsRegistry}:<br>
* <pre>final MetricsInspector inspector = new MetricsInspector(pinotMetricsRegistry);</pre>
* </li>
* <li>
* Every time a metric is added to the registry this MetricsInspector will record it. The most recently added
* metric can be accessed with:<br>
* <pre>final MetricName metricName = inspector.lastMetric();</pre>
* </li>
* <li>
* Use the {@code MetricName} returned by {@link #_lastMetric} to query the properties of the corresponding
* metric, for example:
* <pre>inspector.getTimer(metricName)</pre>
* It's the caller's responsibility to know the type of the metric (Timer, Meter, etc.) and call the appropriate
* getter.
* </li>
* </ul>
*
*/
public class MetricsInspector {
private final Map<MetricName, Metric> _metricMap = new HashMap<>();
private MetricName _lastMetric;

public MetricsInspector(PinotMetricsRegistry registry) {

/* We detect newly added metrics by adding a listener to the metrics registry. Callers typically don't have
direct references to the metrics they create because factory methods usually create the metric and add it to the
registry without returning it. Since there is no easy way to look up the created metric afterward, the listener
approach provides callers with a way to access the metrics they've just created.
*/
registry.addListener(() -> new MetricsRegistryListener() {
@Override
public void onMetricAdded(MetricName metricName, Metric metric) {
_metricMap.put(metricName, metric);
_lastMetric = metricName;
}
@Override
public void onMetricRemoved(MetricName metricName) {
_metricMap.remove(metricName);
}
});
}

/**
* @return the {@code MetricName} of the last metric that was added to the registry
*/
public MetricName lastMetric() {
return _lastMetric;
}

/**
* Extracts the {@code Timer} from a {@code Timer} metric.
*
* @param metric a {@code MetricName} returned by a previous call to {@link #_lastMetric}
* @return the {@code Timer} from the associated metric.
* @throws IllegalArgumentException if the provided {@code MetricName} is not associated with a {@code Timer} metric
*/
public Timer getTimer(MetricName metric) {
return access(metric, m -> m._timer);
}

/**
* Extracts the {@code Metered} from a {@code Metered} metric.
*
* @param metric a {@code MetricName} returned by a previous call to {@link #_lastMetric}
* @return the {@code Metered} from the associated metric.
* @throws IllegalArgumentException if the provided {@code MetricName} is not associated with a {@code Metered} metric
*/

public Metered getMetered(MetricName metric) {
return access(metric, m -> m._metered);
}

private <T> T access(MetricName metricName, Function<MetricAccessor, T> property) {
Metric metric = _metricMap.get(metricName);
if (metric == null) {
throw new IllegalArgumentException("Metric not found: " + metricName);
}

MetricAccessor accessor = new MetricAccessor();
try {
metric.processWith(accessor, null, null);
} catch (Exception e) {
// Convert checked exception (from processWith API) to unchecked because our MetricProcessor doesn't throw
throw new IllegalStateException("Unexpected error processing metric: " + metric, e);
}

T result = property.apply(accessor);
if (result == null) {
throw new IllegalArgumentException("Requested metric type not found in metric [" + metricName.getName() + "]");
}
return result;
}

/**
* A MetricProcessor that simply captures the internals of a {@code Metric}. For internal use only.<br>
*/
private static class MetricAccessor implements MetricProcessor<Void> {

public Metered _metered;
public Counter _counter;
public Histogram _histogram;
public Timer _timer;
public Gauge<?> _gauge;

@Override
public void processMeter(MetricName n, Metered metered, Void v) {
_metered = metered;
}

@Override
public void processCounter(MetricName n, Counter counter, Void v) {
_counter = counter;
}

@Override
public void processHistogram(MetricName n, Histogram histogram, Void v) {
_histogram = histogram;
}

@Override
public void processTimer(MetricName n, Timer timer, Void v) {
_timer = timer;
}

@Override
public void processGauge(MetricName metricName, Gauge<?> gauge, Void v) {
_gauge = gauge;
}
}
}

0 comments on commit 956b7cf

Please sign in to comment.