-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathCountingServer.java
99 lines (84 loc) · 3.49 KB
/
CountingServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package io.github.stomp.server;
import io.github.stomp.StompFrame;
import io.github.stomp.StompServer;
import io.github.stomp.StompUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class CountingServer implements StompServer {
private static final Scheduler COUNTING_SCHEDULER = Schedulers.boundedElastic();
public static final String COUNTING_WEBSOCKET_PATH = "/count";
private static final long DELAY_MILLIS = 500L;
private final Map<String, Map<String, Disposable>> subscriptions = new ConcurrentHashMap<>();
private final Map<String, Sinks.Many<StompFrame>> sinks = new ConcurrentHashMap<>();
public static StompFrame generateCountMessage(final String destination, final String subscriptionId, final long i) {
return StompUtils.makeMessage(destination, subscriptionId, 0 < i ? String.valueOf(i) : "Starting count");
}
@Override
public String path() {
return COUNTING_WEBSOCKET_PATH;
}
@Override
public Mono<List<Flux<StompFrame>>> addWebSocketSources(final WebSocketSession session) {
return Mono.just(
Collections.singletonList(
this.sinks.computeIfAbsent(session.getId(), k -> Sinks.many().unicast().onBackpressureBuffer()).asFlux()
)
);
}
@Override
public Mono<Void> doFinally(final WebSocketSession session, final Map<String, Tuple2<AckMode, Queue<String>>> subscriptionCache, final Map<String, StompFrame> frameCache) {
final String sessionId = session.getId();
final Map<String, Disposable> subscriptions = this.subscriptions.remove(sessionId);
if (subscriptions != null) {
subscriptions.forEach((k, v) -> {
if (v != null) {
v.dispose();
}
});
}
this.sinks.remove(sessionId);
return StompServer.super.doFinally(session, subscriptionCache, frameCache);
}
@Override
public Mono<StompFrame> onSubscribe(final WebSocketSession session, final StompFrame inbound, final StompFrame outbound, final String destination, final String subscriptionId) {
this.subscriptions.computeIfAbsent(session.getId(), k -> new ConcurrentHashMap<>())
.put(
subscriptionId,
Flux.interval(Duration.ofMillis(DELAY_MILLIS))
.doOnNext(i -> {
final Sinks.Many<StompFrame> sink = this.sinks.get(session.getId());
if (sink != null) {
sink.tryEmitNext(generateCountMessage(destination, subscriptionId, i));
}
})
.subscribeOn(COUNTING_SCHEDULER)
.subscribe()
);
return StompServer.super.onSubscribe(session, inbound, outbound, destination, subscriptionId);
}
@Override
public Mono<StompFrame> onUnsubscribe(final WebSocketSession session, final StompFrame inbound, final StompFrame outbound, final String subscriptionId) {
final Map<String, Disposable> sessionSubscriptions = this.subscriptions.get(session.getId());
if (sessionSubscriptions != null) {
final Disposable disposable = sessionSubscriptions.remove(subscriptionId);
if (disposable != null) {
disposable.dispose();
}
}
return StompServer.super.onUnsubscribe(session, inbound, outbound, subscriptionId);
}
}