-
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(msg): Support multi-channel messages registering.
- Loading branch information
Showing
14 changed files
with
309 additions
and
55 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
74 changes: 74 additions & 0 deletions
74
api/src/main/java/cc/carm/plugin/mineredis/api/channel/RedisChannel.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package cc.carm.plugin.mineredis.api.channel; | ||
|
||
import cc.carm.plugin.mineredis.MineRedis; | ||
import cc.carm.plugin.mineredis.api.message.RedisMessage; | ||
import cc.carm.plugin.mineredis.api.message.RedisMessageListener; | ||
import cc.carm.plugin.mineredis.api.message.PreparedRedisMessage; | ||
import com.google.common.io.ByteArrayDataOutput; | ||
import io.lettuce.core.RedisFuture; | ||
import org.jetbrains.annotations.NotNull; | ||
import org.jetbrains.annotations.Nullable; | ||
|
||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
import java.util.function.Predicate; | ||
|
||
public class RedisChannel implements RedisMessageListener { | ||
|
||
public static RedisChannelBuilder builder(String channel) { | ||
return new RedisChannelBuilder(channel); | ||
} | ||
|
||
protected final @NotNull String channel; | ||
protected final @Nullable Predicate<RedisMessage> filter; | ||
protected final @Nullable Function<RedisMessage, PreparedRedisMessage> handler; | ||
|
||
public RedisChannel(@NotNull String channel, | ||
@Nullable Predicate<RedisMessage> filter, | ||
@Nullable Function<RedisMessage, PreparedRedisMessage> handler) { | ||
this.channel = channel; | ||
this.filter = filter; | ||
this.handler = handler; | ||
} | ||
|
||
public @NotNull String getChannel() { | ||
return this.channel; | ||
} | ||
|
||
@Override | ||
public void handle(RedisMessage message) { | ||
if (handler == null) return; | ||
if (!channel.equals(message.getChannel())) return; | ||
if (filter != null && !filter.test(message)) return; | ||
PreparedRedisMessage response = handler.apply(message); | ||
if (response != null) response.publish(); | ||
} | ||
|
||
public RedisFuture<Long> publishAsync(@NotNull ByteArrayDataOutput data) { | ||
return MineRedis.publishAsync(channel, data); | ||
} | ||
|
||
|
||
public RedisFuture<Long> publishAsync(@NotNull Consumer<ByteArrayDataOutput> data) { | ||
return MineRedis.publishAsync(channel, data); | ||
} | ||
|
||
|
||
public RedisFuture<Long> publishAsync(Object... values) { | ||
return MineRedis.publishAsync(channel, values); | ||
} | ||
|
||
|
||
public long publish(@NotNull Object... values) { | ||
return MineRedis.publish(channel, values); | ||
} | ||
|
||
public long publish(@NotNull ByteArrayDataOutput data) { | ||
return MineRedis.publish(channel, data); | ||
} | ||
|
||
public long publish(@NotNull Consumer<ByteArrayDataOutput> data) { | ||
return MineRedis.publish(channel, data); | ||
} | ||
|
||
} |
40 changes: 40 additions & 0 deletions
40
api/src/main/java/cc/carm/plugin/mineredis/api/channel/RedisChannelBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package cc.carm.plugin.mineredis.api.channel; | ||
|
||
import cc.carm.plugin.mineredis.api.message.RedisMessage; | ||
import cc.carm.plugin.mineredis.api.message.PreparedRedisMessage; | ||
|
||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
import java.util.function.Predicate; | ||
|
||
public class RedisChannelBuilder { | ||
|
||
protected final String channel; | ||
protected Predicate<RedisMessage> filter = m -> !m.isLocalMessage(); | ||
|
||
public RedisChannelBuilder(String channel) { | ||
this.channel = channel; | ||
} | ||
|
||
|
||
public RedisChannelBuilder filter(Predicate<RedisMessage> filter) { | ||
return setFilter(this.filter == null ? filter : this.filter.and(filter)); | ||
} | ||
|
||
public RedisChannelBuilder setFilter(Predicate<RedisMessage> filter) { | ||
this.filter = filter; | ||
return this; | ||
} | ||
|
||
public RedisChannel handle(Consumer<RedisMessage> handler) { | ||
return handle(m -> { | ||
handler.accept(m); | ||
return null; | ||
}); | ||
} | ||
|
||
public RedisChannel handle(Function<RedisMessage, PreparedRedisMessage> handler) { | ||
return new RedisChannel(channel, filter, handler); | ||
} | ||
|
||
} |
52 changes: 52 additions & 0 deletions
52
api/src/main/java/cc/carm/plugin/mineredis/api/message/PreparedRedisMessage.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package cc.carm.plugin.mineredis.api.message; | ||
|
||
import cc.carm.plugin.mineredis.MineRedis; | ||
import cc.carm.plugin.mineredis.api.RedisMessageManager; | ||
import com.google.common.io.ByteArrayDataOutput; | ||
import com.google.common.io.ByteStreams; | ||
import org.jetbrains.annotations.NotNull; | ||
|
||
import java.util.Arrays; | ||
import java.util.function.Consumer; | ||
|
||
public class PreparedRedisMessage { | ||
|
||
public static PreparedRedisMessage of(@NotNull String channel, @NotNull ByteArrayDataOutput data) { | ||
return new PreparedRedisMessage(channel, data); | ||
} | ||
|
||
public static PreparedRedisMessage of(@NotNull String channel, @NotNull Consumer<ByteArrayDataOutput> data) { | ||
ByteArrayDataOutput output = ByteStreams.newDataOutput(); | ||
data.accept(output); | ||
return of(channel, output); | ||
} | ||
|
||
public static PreparedRedisMessage of(@NotNull String channel, @NotNull Object... values) { | ||
return of(channel, o -> RedisMessageManager.writeParams(o, Arrays.asList(values))); | ||
} | ||
|
||
protected final @NotNull String channel; | ||
protected final @NotNull ByteArrayDataOutput data; | ||
|
||
public PreparedRedisMessage(@NotNull String channel, @NotNull ByteArrayDataOutput data) { | ||
this.channel = channel; | ||
this.data = data; | ||
} | ||
|
||
public String channel() { | ||
return channel; | ||
} | ||
|
||
public ByteArrayDataOutput data() { | ||
return data; | ||
} | ||
|
||
public void publish() { | ||
MineRedis.publish(channel, data); | ||
} | ||
|
||
public void publishAsync() { | ||
MineRedis.publishAsync(channel, data); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.