Skip to content

Commit

Permalink
refactor: extract internal array util methods into file `InternalComm…
Browse files Browse the repository at this point in the history
…onUtils` 🔧
  • Loading branch information
oldratlee committed Sep 30, 2024
1 parent 88595c6 commit 23d7d30
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 98 deletions.
124 changes: 27 additions & 97 deletions cffu-core/src/main/java/io/foldright/cffu/CompletableFutureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import org.jetbrains.annotations.Contract;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.*;

import static io.foldright.cffu.Delayer.atCfDelayerThread;
import static io.foldright.cffu.ExceptionReporter.reportUncaughtException;
import static io.foldright.cffu.InternalCommonUtils.*;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;

Expand Down Expand Up @@ -323,23 +323,8 @@ public static <T> CompletableFuture<T> mSupplyAnyAsync(Executor executor, Suppli
return f_cast(CompletableFuture.anyOf(wrapSuppliers0(executor, suppliers)));
}

@SafeVarargs
private static <T> T[] requireArrayAndEleNonNull(String varName, T... array) {
requireNonNull(array, varName + "s is null");
for (int i = 0; i < array.length; i++) {
requireNonNull(array[i], varName + (i + 1) + " is null");
}
return array;
}

private static <T> CompletableFuture<? extends T>[] wrapSuppliers0(
Executor executor, Supplier<? extends T>[] suppliers) {
@SuppressWarnings("unchecked")
CompletableFuture<? extends T>[] cfs = new CompletableFuture[suppliers.length];
for (int i = 0; i < suppliers.length; i++) {
cfs[i] = CompletableFuture.supplyAsync(suppliers[i], executor);
}
return cfs;
private static <T> CompletableFuture<? extends T>[] wrapSuppliers0(Executor executor, Supplier<? extends T>[] suppliers) {
return mapArray(suppliers, CompletableFuture[]::new, s -> CompletableFuture.supplyAsync(s, executor));
}

/**
Expand Down Expand Up @@ -465,12 +450,7 @@ public static CompletableFuture<Void> mRunAnyAsync(Executor executor, Runnable..
}

private static CompletableFuture<Void>[] wrapRunnables0(Executor executor, Runnable[] actions) {
@SuppressWarnings("unchecked")
CompletableFuture<Void>[] cfs = new CompletableFuture[actions.length];
for (int i = 0; i < actions.length; i++) {
cfs[i] = CompletableFuture.runAsync(actions[i], executor);
}
return cfs;
return mapArray(actions, CompletableFuture[]::new, a -> CompletableFuture.runAsync(a, executor));
}

// endregion
Expand Down Expand Up @@ -693,19 +673,14 @@ public static <T1, T2, T3, T4, T5> CompletableFuture<Tuple5<T1, T2, T3, T4, T5>>
}

private static <T> CompletableFuture<T> f_allSuccessTupleOf0(CompletionStage<?>[] stages) {
Function<CompletionStage<Object>, CompletionStage<Object>> converter = s -> s.exceptionally(ex -> null);
return f_allTupleOf0(false, convertStageArray0(converter, stages));
return f_allTupleOf0(false, f_convertStageArray0(stages, s -> s.exceptionally(ex -> null)));
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static <T, U> CompletionStage<U>[] convertStageArray0(
Function<CompletionStage<T>, ? extends CompletionStage<U>> converter,
CompletionStage<? extends T>[] stages) {
CompletionStage[] ret = new CompletionStage[stages.length];
for (int i = 0; i < stages.length; i++) {
ret[i] = converter.apply((CompletionStage) stages[i]);
}
return ret;
private static <T, U> CompletionStage<U>[] f_convertStageArray0(
CompletionStage<? extends T>[] stages, Function<CompletionStage<T>, CompletionStage<U>> converter) {
@SuppressWarnings({"unchecked", "rawtypes"})
CompletionStage<T>[] ss = (CompletionStage[]) stages;
return mapArray(ss, CompletionStage[]::new, converter);
}

/**
Expand Down Expand Up @@ -831,7 +806,7 @@ private static <T> CompletableFuture<T> f_mostSuccessTupleOf0(
// otherwise UnsupportedOperationException
final CompletableFuture<Object>[] cfArray = toNonMinCfArray0(stages);
return cffuCompleteOnTimeout(CompletableFuture.allOf(cfArray), null, executorWhenTimeout, timeout, unit)
.handle((unused, ex) -> f_tupleOf0(MGetSuccessNow0(null, cfArray)));
.handle((unused, ex) -> f_tupleOf0(f_mGetSuccessNow0(null, cfArray)));
}

/**
Expand All @@ -842,12 +817,8 @@ private static <T> CompletableFuture<T> f_mostSuccessTupleOf0(
* otherwise UnsupportedOperationException
*/
@SuppressWarnings("unchecked")
private static <T> T[] MGetSuccessNow0(@Nullable Object valueIfNotSuccess, CompletableFuture<?>[] cfs) {
Object[] ret = new Object[cfs.length];
for (int i = 0; i < cfs.length; i++) {
ret[i] = getSuccessNow(cfs[i], valueIfNotSuccess);
}
return (T[]) ret;
private static <T> T[] f_mGetSuccessNow0(@Nullable T valueIfNotSuccess, CompletableFuture<? extends T>[] cfs) {
return (T[]) fillArray(new Object[cfs.length], i -> getSuccessNow(cfs[i], valueIfNotSuccess));
}

/**
Expand Down Expand Up @@ -962,7 +933,7 @@ private static <T> CompletableFuture<List<T>> allResultsFastFailOf0(CompletionSt
if (len == 0) return completedFuture(arrayList());
// convert input cf to non-minimal-stage CF instance for SINGLE input in order to
// ensure that the returned cf is not minimal-stage instance(UnsupportedOperationException)
if (len == 1) return toNonMinCf0(cfs[0]).thenApply(CompletableFutureUtils::arrayList);
if (len == 1) return toNonMinCf0(cfs[0]).thenApply(InternalCommonUtils::arrayList);

final CompletableFuture<?>[] successOrBeIncomplete = new CompletableFuture[len];
// NOTE: fill ONE MORE element of failedOrBeIncomplete LATER
Expand Down Expand Up @@ -999,8 +970,7 @@ public static <T> CompletableFuture<List<T>> allSuccessResultsOf(

private static <T> CompletableFuture<List<T>> allSuccessResultsOf0(
@Nullable T valueIfFailed, CompletionStage<? extends T>[] cfs) {
Function<CompletionStage<T>, CompletionStage<T>> converter = s -> s.exceptionally(ex -> valueIfFailed);
return allResultsOf0(convertStageArray0(converter, cfs));
return allResultsOf0(f_convertStageArray0(cfs, s -> s.exceptionally(ex -> valueIfFailed)));
}

/**
Expand Down Expand Up @@ -1070,7 +1040,7 @@ private static <T> CompletableFuture<List<T>> mostSuccessResultsOf0(
// otherwise UnsupportedOperationException
final CompletableFuture<T>[] cfArray = toNonMinCfArray0(cfs);
return cffuCompleteOnTimeout(CompletableFuture.allOf(cfArray), null, executorWhenTimeout, timeout, unit)
.handle((unused, ex) -> arrayList(MGetSuccessNow0(valueIfNotSuccess, cfArray)));
.handle((unused, ex) -> arrayList(f_mGetSuccessNow0(valueIfNotSuccess, cfArray)));
}

/**
Expand Down Expand Up @@ -1099,7 +1069,7 @@ private static <T> CompletableFuture<List<T>> allResultsOf0(CompletionStage<? ex
if (len == 0) return completedFuture(arrayList());
// convert input cf to non-minimal-stage CF instance for SINGLE input in order to
// ensure that the returned cf is not minimal-stage instance(UnsupportedOperationException)
if (len == 1) return toNonMinCf0(cfs[0]).thenApply(CompletableFutureUtils::arrayList);
if (len == 1) return toNonMinCf0(cfs[0]).thenApply(InternalCommonUtils::arrayList);

final Object[] result = new Object[len];
final CompletableFuture<Void>[] resultSetterCfs = createResultSetterCfs(cfs, result);
Expand Down Expand Up @@ -1211,29 +1181,14 @@ private static <S extends CompletionStage<?>> S[] requireCfsAndEleNonNull(S... s
return requireArrayAndEleNonNull("cf", stages);
}

/**
* Returns normal array list instead of unmodifiable or fixed-size list.
* Safer for application code which may reuse the returned list as normal collection.
*/
@SafeVarargs
private static <T> List<T> arrayList(T... elements) {
List<T> ret = new ArrayList<>(elements.length);
ret.addAll(Arrays.asList(elements));
return ret;
}

/**
* Returns a cf array whose elements do the result collection.
*/
private static <T> CompletableFuture<Void>[] createResultSetterCfs(
CompletionStage<? extends T>[] stages, T[] result) {
@SuppressWarnings("unchecked")
final CompletableFuture<Void>[] resultSetterCfs = new CompletableFuture[result.length];
for (int i = 0; i < result.length; i++) {
final int index = i;
resultSetterCfs[index] = f_toCf0(stages[index]).thenAccept(v -> result[index] = v);
}
return resultSetterCfs;
return fillArray(resultSetterCfs, i -> f_toCf0(stages[i]).thenAccept(v -> result[i] = v));
}

private static <T> void fill0(CompletionStage<? extends T>[] stages,
Expand Down Expand Up @@ -1261,26 +1216,15 @@ private static <T> CompletableFuture<T> f_cast(CompletableFuture<?> cf) {
* More info see method {@link #f_toCf0(CompletionStage)}.
*/
private static <T> CompletableFuture<T>[] f_toCfArray0(CompletionStage<? extends T>[] stages) {
return _toCfArray0(CompletableFutureUtils::f_toCf0, stages);
return mapArray(stages, CompletableFuture[]::new, CompletableFutureUtils::f_toCf0);
}

/**
* Converts {@link CompletionStage} array to {@link CompletableFuture} array.
* More info see method {@link #toNonMinCf0(CompletionStage)}.
*/
private static <T> CompletableFuture<T>[] toNonMinCfArray0(CompletionStage<? extends T>[] stages) {
return _toCfArray0(CompletableFutureUtils::toNonMinCf0, stages);
}

private static <T> CompletableFuture<T>[] _toCfArray0(
Function<CompletionStage<? extends T>, CompletableFuture<T>> converter,
CompletionStage<? extends T>[] stages) {
@SuppressWarnings("unchecked")
CompletableFuture<T>[] ret = new CompletableFuture[stages.length];
for (int i = 0; i < stages.length; i++) {
ret[i] = converter.apply(stages[i]);
}
return ret;
return mapArray(stages, CompletableFuture[]::new, CompletableFutureUtils::toNonMinCf0);
}

/**
Expand Down Expand Up @@ -1320,10 +1264,6 @@ private static <T> CompletableFuture<T> toNonMinCfCopy0(CompletionStage<? extend
return isMinStageCf(f) ? f.toCompletableFuture() : copy(f);
}

static boolean isMinStageCf(CompletableFuture<?> cf) {
return cf.getClass().equals(MIN_STAGE_CLASS);
}

// endregion
////////////////////////////////////////////////////////////
// region## anyOf* Methods
Expand Down Expand Up @@ -2022,13 +1962,7 @@ public static <T, U> CompletableFuture<U> thenMApplyAnyAsync(

private static <T, U> CompletableFuture<U>[] wrapFunctions0(
Executor executor, @Nullable T v, Function<? super T, ? extends U>[] fns) {
@SuppressWarnings("unchecked")
CompletableFuture<U>[] cfs = new CompletableFuture[fns.length];
for (int i = 0; i < fns.length; i++) {
final int idx = i;
cfs[i] = CompletableFuture.supplyAsync(() -> fns[idx].apply(v), executor);
}
return cfs;
return mapArray(fns, CompletableFuture[]::new, f -> CompletableFuture.supplyAsync(() -> f.apply(v), executor));
}

/**
Expand Down Expand Up @@ -2160,13 +2094,7 @@ public static <T> CompletableFuture<Void> thenMAcceptAnyAsync(
}

private static <T> CompletableFuture<Void>[] wrapConsumers0(Executor executor, T v, Consumer<? super T>[] actions) {
@SuppressWarnings("unchecked")
CompletableFuture<Void>[] cfs = new CompletableFuture[actions.length];
for (int i = 0; i < actions.length; i++) {
final int idx = i;
cfs[idx] = CompletableFuture.runAsync(() -> actions[idx].accept(v), executor);
}
return cfs;
return mapArray(actions, CompletableFuture[]::new, a -> CompletableFuture.runAsync(() -> a.accept(v), executor));
}

/**
Expand Down Expand Up @@ -3757,10 +3685,8 @@ C peekAsync(C cfThis, BiConsumer<? super T, ? super Throwable> action, Executor
public static <T> T join(CompletableFuture<T> cfThis, long timeout, TimeUnit unit) {
requireNonNull(cfThis, "cfThis is null");
requireNonNull(unit, "unit is null");

if (cfThis.isDone()) return cfThis.join();
// defensive copy input cf to avoid writing it by `orTimeout`
return orTimeout(copy(cfThis), timeout, unit).join();
return cfThis.isDone() ? cfThis.join() : orTimeout(copy(cfThis), timeout, unit).join();
}

/**
Expand Down Expand Up @@ -4209,6 +4135,10 @@ private static boolean methodExists(Supplier<?> methodCallCheck) {
else MIN_STAGE_CLASS = CompletableFuture.completedStage(null).getClass();
}

static boolean isMinStageCf(CompletableFuture<?> cf) {
return cf.getClass().equals(MIN_STAGE_CLASS);
}

private CompletableFutureUtils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ final class ExceptionReporter {

@Nullable
@Contract("_, _ -> null")
@SuppressWarnings("StatementWithEmptyBody")
@SuppressWarnings({"StatementWithEmptyBody", "SameReturnValue"})
static <T> T reportUncaughtException(String where, Throwable ex) {
final String fullReport = "full";
final String shortReport = "short";
Expand Down
58 changes: 58 additions & 0 deletions cffu-core/src/main/java/io/foldright/cffu/InternalCommonUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.foldright.cffu;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.function.Function;
import java.util.function.IntFunction;

import static java.util.Objects.requireNonNull;


/**
* Internal common util methods.
*/
final class InternalCommonUtils {
@SafeVarargs
static <T> T[] requireArrayAndEleNonNull(String varName, T... array) {
requireNonNull(array, varName + "s is null");
for (int i = 0; i < array.length; i++) {
requireNonNull(array[i], varName + (i + 1) + " is null");
}
return array;
}

/**
* example code of "map int[] to string[]":
*
* <pre>{@code Integer[] source = new Integer[3];
* mapArray(source, String[]::new, i -> "integer: " + i);
* }</pre>
*/
@SuppressWarnings("unchecked")
static <T, R> R[] mapArray(T[] source, IntFunction<Object[]> destConstructor, Function<T, R> mapper) {
Object[] ret = destConstructor.apply(source.length);
for (int i = 0; i < source.length; i++) {
ret[i] = mapper.apply(source[i]);
}
return (R[]) ret;
}

static <T> T[] fillArray(T[] array, IntFunction<T> init) {
for (int i = 0; i < array.length; i++) {
array[i] = init.apply(i);
}
return array;
}

/**
* Returns normal array list instead of unmodifiable({@link java.util.List#of}) or fixed-size
* ({@link Arrays#asList}) list. Safer for application code which may reuse the return list as normal collection.
*/
@SafeVarargs
static <T> ArrayList<T> arrayList(T... elements) {
return new ArrayList<>(Arrays.asList(elements));
}

private InternalCommonUtils() {
}
}

0 comments on commit 23d7d30

Please sign in to comment.