-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHelloWorldServer.java
59 lines (48 loc) · 2.22 KB
/
HelloWorldServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package io.github.stomp.server;
import io.github.stomp.StompFrame;
import io.github.stomp.StompServer;
import io.github.stomp.StompUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.MimeType;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.function.Tuple2;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class HelloWorldServer implements StompServer {
public static final String COUNTING_WEBSOCKET_PATH = "/hello-world";
private final Map<String, Sinks.Many<StompFrame>> sinks = new ConcurrentHashMap<>();
public static StompFrame generateHelloWorldMessage(final String destination, final String subscriptionId, final Charset charset) {
return StompUtils.makeMessage(destination, subscriptionId, new MimeType("text", "plain", charset), "Hello World!".getBytes(charset));
}
@Override
public String path() {
return COUNTING_WEBSOCKET_PATH;
}
@Override
public Mono<List<Flux<StompFrame>>> addWebSocketSources(final WebSocketSession session) {
return Mono.just(
Collections.singletonList(
this.sinks.computeIfAbsent(session.getId(), k -> Sinks.many().unicast().onBackpressureBuffer()).asFlux()
)
);
}
@Override
public Mono<Void> doFinally(final WebSocketSession session, final Map<String, Tuple2<AckMode, Queue<String>>> subscriptionCache, final Map<String, StompFrame> frameCache) {
this.sinks.remove(session.getId());
return StompServer.super.doFinally(session, subscriptionCache, frameCache);
}
@Override
public Mono<StompFrame> onSubscribe(final WebSocketSession session, final StompFrame inbound, final StompFrame outbound, final String destination, final String subscriptionId) {
final Charset charset = Optional.ofNullable(inbound.bodyCharset()).orElse(StompFrame.DEFAULT_CHARSET);
final Sinks.Many<StompFrame> sink = this.sinks.get(session.getId());
if (sink != null) {
sink.tryEmitNext(generateHelloWorldMessage(destination, subscriptionId, charset));
}
return StompServer.super.onSubscribe(session, inbound, outbound, destination, subscriptionId);
}
}