Skip to content

Commit

Permalink
[SDCISA-15833] Cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenalpha committed May 2, 2024
1 parent 7258233 commit 8368677
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 33 deletions.
36 changes: 14 additions & 22 deletions src/main/java/org/swisspush/redisques/performance/JavaGcStats.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.swisspush.redisques.performance;

import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.slf4j.Logger;

Expand All @@ -10,7 +9,6 @@

import static java.lang.Float.NaN;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.currentThread;
import static org.slf4j.LoggerFactory.getLogger;


Expand All @@ -26,13 +24,11 @@ public class JavaGcStats {
private static float gcFrac01 = 0, gcFrac05 = 0, gcFrac15 = 0;

public JavaGcStats(Vertx vertx){
vertx.<Void>executeBlocking(this::initObserver, false, fut -> {
if( fut.failed() ) throw new RuntimeException("", fut.cause());
});
initObserver();
}

/** Fills in the values into the passed structure */
public void fillMeasurement(Measurement measurement) {
public Measurement fillMeasurement(Measurement measurement) {
long workerHeartbeatAgeMs = currentTimeMillis() - workerHeartbeatEpochMs;
if ( workerHeartbeatAgeMs > 60_000) {
log.warn("Huh? No worker heartbeat since {}ms?", workerHeartbeatAgeMs);
Expand All @@ -42,27 +38,24 @@ public void fillMeasurement(Measurement measurement) {
measurement.gcFrac05 = gcFrac05;
measurement.gcFrac15 = gcFrac15;
}
return measurement;
}

private void initObserver(Promise<Void> onDone) {
try {
assert !currentThread().getName().toUpperCase().contains("EVENTLOOP");
synchronized (workerInitLock) {
if( worker == null ){
worker = new Worker();
worker.setDaemon(true);
worker.start();
}
private void initObserver() {
synchronized (workerInitLock) {
if( worker == null ){
worker = new Worker();
worker.setDaemon(true);
worker.start();
}
onDone.complete();
} catch (Throwable ex) {
onDone.fail(ex);
}
}

private /*TODO rm static*/static class Worker extends Thread {

private static final int BACKLOG = 16;
private static final float SAMPLES_PER_SEC = 4f / 60f;
private static final int RECOVER_PERIOD_MS = 5_000;
private static final int BACKLOG = (int) (SAMPLES_PER_SEC * (15 * 60) + .5) + 1;
private final LongRingbuffer measuredGcTimes = new LongRingbuffer(BACKLOG);
private final LongRingbuffer measuredEpochMs = new LongRingbuffer(BACKLOG);
private final long tmpLongs[] = new long[BACKLOG];
Expand All @@ -75,7 +68,7 @@ private void initObserver(Promise<Void> onDone) {
if( unexpectedEx != null ){
// Reset, but cool-down before risking the exception again.
unexpectedEx = null;
sleep(5000);
sleep(RECOVER_PERIOD_MS);
}
assert currentThread() == worker;
step();
Expand Down Expand Up @@ -136,7 +129,6 @@ private void updateStats() {
assert gcFrac01 >= 0 && gcFrac01 <= 1 : "0 <= "+ gcFrac01 +" <= 1";
assert gcFrac05 >= 0 && gcFrac05 <= 1 : "0 <= "+ gcFrac05 +" <= 1";
assert gcFrac15 >= 0 && gcFrac15 <= 1 : "0 <= "+ gcFrac15 +" <= 1";
log.info("GC time usage: {}/min {}/5min {}/15min", gcFrac01, gcFrac05, gcFrac15);
synchronized (measurementLock) {
JavaGcStats.gcFrac01 = gcFrac01;
JavaGcStats.gcFrac05 = gcFrac05;
Expand All @@ -159,7 +151,7 @@ public static class Measurement {
* <p>Example 2: If gcFrac15 is 0.03, this means that 3 percent of
* execution time got into garbage collection.</p>
*/
float gcFrac01 = NaN, gcFrac05 = NaN, gcFrac15 = NaN;
public float gcFrac01 = NaN, gcFrac05 = NaN, gcFrac15 = NaN;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,24 @@ public class LongRingbuffer {

private static final Logger log = getLogger(LongRingbuffer.class);
private final Object pushpopLock = new Object();
private final int capacity;
private final long ring[];
private final int overflowMask;
private int wrCur;
private boolean isFilled;

LongRingbuffer(int capacity) {
if (capacity < 1) throw new IllegalArgumentException("assert(capacity >= 1)");
this.capacity = capacity;
this.ring = new long[capacity];
this.wrCur = 0;
int numBits = 0;
for (int i = 0; i < 30; ++i){
numBits += capacity >> i & 1;
}
if (numBits != 1) {
throw new IllegalArgumentException(capacity + " (capacity) MUST be one of 1, 2, 4, 8, 16, ...");
}
this.overflowMask = capacity - 1;
}

public void add(long value) {
synchronized (pushpopLock) {
//log.trace("ring[{}] = {}", wrCur, value);
ring[wrCur] = value;
wrCur += 1;
if (wrCur >= ring.length) {
if (wrCur >= capacity) {
wrCur = 0;
isFilled = true;
}
Expand All @@ -52,7 +46,7 @@ public int read(long dst[], int off, int len) {
rangeTwoLen = 0;
} else {
rangeOneOff = wrCur;
rangeOneLen = ring.length - rangeOneOff;
rangeOneLen = capacity - rangeOneOff;
rangeTwoOff = 0;
rangeTwoLen = rangeOneOff == 0 ? 0 : wrCur;
}
Expand Down

0 comments on commit 8368677

Please sign in to comment.