Skip to content

Commit

Permalink
feat(callback): Support callback functions
Browse files Browse the repository at this point in the history
  • Loading branch information
CarmJos committed Jan 22, 2024
1 parent 6f91ddf commit 548bb84
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 12 deletions.
21 changes: 17 additions & 4 deletions api/src/main/java/cc/carm/plugin/mineredis/MineRedis.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cc.carm.plugin.mineredis;

import cc.carm.plugin.mineredis.api.RedisManager;
import cc.carm.plugin.mineredis.api.callback.RedisCallbackBuilder;
import cc.carm.plugin.mineredis.api.message.RedisMessageListener;
import com.google.common.io.ByteArrayDataOutput;
import io.lettuce.core.ClientOptions;
Expand Down Expand Up @@ -105,8 +106,8 @@ public static long publish(@NotNull String channel, @NotNull Consumer<ByteArrayD
return getManager().publish(channel, byteOutput);
}

public static long publish(@NotNull String channel, @NotNull String content) {
return getManager().publish(channel, content);
public static long publish(@NotNull String channel, @NotNull Object... values) {
return getManager().publish(channel, values);
}

public static RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull ByteArrayDataOutput byteOutput) {
Expand All @@ -117,8 +118,20 @@ public static RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull C
return getManager().publishAsync(channel, byteOutput);
}

public static RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull String content) {
return getManager().publishAsync(channel, content);
public static RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull Object... values) {
return getManager().publishAsync(channel, values);
}

public static RedisCallbackBuilder request(@NotNull String channel, @NotNull ByteArrayDataOutput byteOutput) {
return getManager().callback(channel, byteOutput);
}

public static RedisCallbackBuilder request(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> byteOutput) {
return getManager().callback(channel, byteOutput);
}

public static RedisCallbackBuilder request(@NotNull String channel, @NotNull Object... values) {
return getManager().callback(channel, values);
}

public static void registerGlobalListener(@NotNull RedisMessageListener listener, @NotNull RedisMessageListener... moreListeners) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package cc.carm.plugin.mineredis.api;

import cc.carm.plugin.mineredis.api.callback.RedisCallbackBuilder;
import cc.carm.plugin.mineredis.api.message.RedisMessage;
import cc.carm.plugin.mineredis.api.message.RedisMessageListener;
import com.google.common.io.ByteArrayDataOutput;
import io.lettuce.core.RedisFuture;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* 发布与订阅(Pub/Sub)管理器。
Expand Down Expand Up @@ -54,16 +60,24 @@ public interface RedisMessageManager {

long publish(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> byteOutput);

default long publish(@NotNull String channel, @NotNull String content) {
return publish(channel, s -> s.writeUTF(content));
default long publish(@NotNull String channel, @NotNull Object... values) {
return publish(channel, s -> writeParams(s, Arrays.asList(values)));
}

RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull ByteArrayDataOutput byteOutput);

RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> byteOutput);

default RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull String content) {
return publishAsync(channel, s -> s.writeUTF(content));
default RedisFuture<Long> publishAsync(@NotNull String channel, @NotNull Object... values) {
return publishAsync(channel, s -> writeParams(s, Arrays.asList(values)));
}

RedisCallbackBuilder callback(@NotNull String channel, @NotNull ByteArrayDataOutput byteOutput);

RedisCallbackBuilder callback(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> byteOutput);

default RedisCallbackBuilder callback(@NotNull String channel, @NotNull Object... values) {
return callback(channel, s -> writeParams(s, Arrays.asList(values)));
}

void registerGlobalListener(@NotNull RedisMessageListener listener, @NotNull RedisMessageListener... moreListeners);
Expand All @@ -75,4 +89,28 @@ void registerPatternListener(@NotNull RedisMessageListener listener,
@NotNull String channelPattern, @NotNull String... morePatterns);

void unregisterListener(@NotNull RedisMessageListener listener);

static void writeParams(ByteArrayDataOutput data, List<Object> params) {
params.forEach(param -> writeParam(data, param));
}

static void writeParam(ByteArrayDataOutput data, Object value) {
if (value instanceof Long) {
data.writeLong((Long) value);
} else if (value instanceof Integer) {
data.writeInt((Integer) value);
} else if (value instanceof Short) {
data.writeShort((Short) value);
} else if (value instanceof Byte) {
data.writeByte((Byte) value);
} else if (value instanceof Double) {
data.writeDouble((Double) value);
} else if (value instanceof Float) {
data.writeFloat((Float) value);
} else if (value instanceof Boolean) {
data.writeBoolean((Boolean) value);
} else if (value instanceof String) {
data.writeUTF((String) value);
} else throw new IllegalArgumentException("Unsupported type: " + value.getClass().getName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package cc.carm.plugin.mineredis.api.callback;

import cc.carm.plugin.mineredis.api.RedisManager;
import cc.carm.plugin.mineredis.api.message.RedisMessage;
import cc.carm.plugin.mineredis.api.message.RedisMessageListener;
import com.google.common.io.ByteArrayDataOutput;
import org.jetbrains.annotations.NotNull;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Predicate;

public class RedisCallbackBuilder {

protected final @NotNull RedisManager redis;
protected final @NotNull String requestChannel;
protected final @NotNull ByteArrayDataOutput requestData;

protected Predicate<RedisMessage> filter;
protected Duration timeoutDuration = Duration.ofSeconds(5);

public RedisCallbackBuilder(@NotNull RedisManager redis,
@NotNull String requestChannel, @NotNull ByteArrayDataOutput requestData) {
this.redis = redis;
this.requestChannel = requestChannel;
this.requestData = requestData;
}

public <R> CompletableFuture<R> response(@NotNull String channel,
@NotNull Function<RedisMessage, R> handler) {
CompletableFuture<R> future = new CompletableFuture<>();
RedisMessageListener listener = message -> {
if (filter != null && !filter.test(message)) return;
future.complete(handler.apply(message));
};
redis.registerChannelListener(listener, channel);
redis.publish(requestChannel, requestData);
return future.whenComplete((r, e) -> redis.unregisterListener(listener));
}

public RedisCallbackBuilder filter(@NotNull Predicate<RedisMessage> filter) {
this.filter = filter;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import cc.carm.plugin.mineredis.MineRedis;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteStreams;
import org.jetbrains.annotations.NotNull;

import java.util.function.Function;
Expand All @@ -12,14 +13,14 @@ public class RedisMessage {
protected final @NotNull String sourceServerID;
protected final long timestamp;

protected final ByteArrayDataInput data;
protected final byte[] rawData;

public RedisMessage(@NotNull String channel, @NotNull String sourceServerID,
long timestamp, ByteArrayDataInput data) {
long timestamp, byte[] raw) {
this.channel = channel;
this.sourceServerID = sourceServerID;
this.timestamp = timestamp;
this.data = data;
this.rawData = raw;
}

/**
Expand All @@ -43,8 +44,13 @@ public long getTimestamp() {
return timestamp;
}

public byte[] getRawData() {
return rawData;
}

@SuppressWarnings("UnstableApiUsage")
public ByteArrayDataInput getData() {
return data;
return ByteStreams.newDataInput(rawData);
}

public <T> T apply(@NotNull Function<ByteArrayDataInput, T> handler) {
Expand Down
24 changes: 24 additions & 0 deletions api/src/test/java/RedisCallback.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import cc.carm.plugin.mineredis.MineRedis;

import java.util.UUID;

public class RedisCallback {

public void demo() {
UUID requestID = UUID.randomUUID();
MineRedis.request("test.request", out -> {
out.writeUTF(requestID.toString());
out.writeUTF("test");
})
.filter(message -> message.getData().readUTF().equals(requestID.toString())) // 限制条件
.response("test.response", message -> { //
System.out.println("response: " + message.getData().readUTF());
return message.getData().readUTF();
}) // 如有收到了符合条件的反馈,则读取结果
.thenAccept(System.out::println) // 使用结果
.exceptionally(throwable -> {
throwable.printStackTrace();
return null;
});
}
}

0 comments on commit 548bb84

Please sign in to comment.