Skip to content

Commit

Permalink
Merge branch 'master' into refactor-workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
mizosoft authored Feb 10, 2024
2 parents 5308ad3 + 4632bf1 commit 6d0eadd
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Moataz Abdelnasser
* Copyright (c) 2024 Moataz Abdelnasser
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -32,6 +32,8 @@
import java.lang.System.Logger.Level;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -42,7 +44,7 @@ class RedisSupport {
static final String CLI_CMD = "redis-cli";

@GuardedBy("RedisSupport.class")
private static @MonotonicNonNull Set<String> lazyAvailability = null;
private static @MonotonicNonNull Set<String> lazyAvailableCommands = null;

private RedisSupport() {}

Expand All @@ -57,12 +59,12 @@ public static boolean isRedisClusterAvailable() {
private static synchronized boolean isAvailable(String command) {
requireArgument(
command.equals(SERVER_CMD) || command.equals(CLI_CMD), "unrecognized command: %s", command);
var availability = lazyAvailability;
if (availability == null) {
availability = checkAvailability();
lazyAvailability = availability;
var availableCommands = lazyAvailableCommands;
if (availableCommands == null) {
availableCommands = checkAvailability();
lazyAvailableCommands = availableCommands;
}
return availability.contains(command);
return availableCommands.contains(command);
}

private static Set<String> checkAvailability() {
Expand All @@ -88,7 +90,7 @@ private static Set<String> checkAvailability() {
}
}

private static String versionOf(String command) {
private static String versionOf(String command) throws UnavailableCommandException {
try {
var process =
new ProcessBuilder().command(command, "--version").redirectErrorStream(true).start();
Expand All @@ -113,7 +115,8 @@ private static void reportUnavailability(
String cmd,
String unavailabilityReason,
@Nullable Throwable exception,
@Nullable Reader reader) {
@Nullable Reader reader)
throws UnavailableCommandException {
String output;
try {
if (reader != null) {
Expand All @@ -131,7 +134,13 @@ private static void reportUnavailability(
}

throw new UnavailableCommandException(
String.format("Unavailable redis command <%s>: %s%n%s", cmd, unavailabilityReason, output),
String.format(
"Unavailable redis command <%s>: %s%n%s",
cmd,
unavailabilityReason,
Stream.of(output.split("\\r?\\n"))
.map(line -> "\t" + line)
.collect(Collectors.joining())),
exception);
}

Expand All @@ -145,7 +154,7 @@ static String dumpRemaining(Reader reader) throws IOException {
return sb.toString();
}

private static final class UnavailableCommandException extends IllegalStateException {
private static final class UnavailableCommandException extends IOException {
UnavailableCommandException(String message, Throwable cause) {
super(message, cause);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Moataz Abdelnasser
* Copyright (c) 2024 Moataz Abdelnasser
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -27,8 +27,6 @@
import com.github.mizosoft.methanol.internal.flow.AbstractQueueSubscription;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
Expand Down Expand Up @@ -86,17 +84,6 @@ public void subscribe(Subscriber<? super HttpResponse<T>> subscriber) {

private static final class SubscriptionImpl<V>
extends AbstractQueueSubscription<HttpResponse<V>> {
private static final VarHandle ONGOING;

static {
try {
var lookup = MethodHandles.lookup();
ONGOING = lookup.findVarHandle(SubscriptionImpl.class, "ongoing", int.class);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new ExceptionInInitializerError(e);
}
}

private final HttpClient client;
private final HttpRequest initialRequest;
private final BodyHandler<V> handler;
Expand Down Expand Up @@ -238,7 +225,13 @@ private void applyPushPromise(
}

if (pushedResponseBodyHandler != null) {
ONGOING.getAndAdd(SubscriptionImpl.this, 1);
lock.lock();
try {
ongoing += 1;
} finally {
lock.unlock();
}

acceptor.apply(pushedResponseBodyHandler).whenComplete(SubscriptionImpl.this::onResponse);
}
}
Expand Down

0 comments on commit 6d0eadd

Please sign in to comment.