From 7975f7b41b7e25d12c352f1539afc68635f29c75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 11 Apr 2024 14:38:31 +0200 Subject: [PATCH 1/2] Initial rabbit_web work --- deps/rabbit/src/rabbit_networking.erl | 1 + deps/rabbit_common/src/rabbit_net.erl | 45 +++- .../src/rabbit_mqtt_keepalive.erl | 6 +- .../src/rabbit_mqtt_processor.erl | 5 + deps/rabbitmq_web_dispatch/src/rabbit_web.erl | 217 ++++++++++++++++++ .../src/rabbit_web_ws_h.erl | 100 ++++++++ .../src/rabbit_web_mqtt_app.erl | 47 +++- .../src/rabbit_web_mqtt_handler.erl | 65 ++---- rabbitmq-components.mk | 4 +- 9 files changed, 421 insertions(+), 69 deletions(-) create mode 100644 deps/rabbitmq_web_dispatch/src/rabbit_web.erl create mode 100644 deps/rabbitmq_web_dispatch/src/rabbit_web_ws_h.erl diff --git a/deps/rabbit/src/rabbit_networking.erl b/deps/rabbit/src/rabbit_networking.erl index 4563da64d1b4..6f09714500f5 100644 --- a/deps/rabbit/src/rabbit_networking.erl +++ b/deps/rabbit/src/rabbit_networking.erl @@ -132,6 +132,7 @@ boot_tls(NumAcceptors, ConcurrentConnsSupsCount) -> -spec ensure_ssl() -> rabbit_types:infos(). +%% @todo There is no need to start ssl as it is a dependency of Ranch. ensure_ssl() -> {ok, SslAppsConfig} = application:get_env(rabbit, ssl_apps), ok = app_utils:start_applications(SslAppsConfig), diff --git a/deps/rabbit_common/src/rabbit_net.erl b/deps/rabbit_common/src/rabbit_net.erl index 494126795740..1578591860c9 100644 --- a/deps/rabbit_common/src/rabbit_net.erl +++ b/deps/rabbit_common/src/rabbit_net.erl @@ -28,6 +28,9 @@ -type ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any()). -type ok_or_any_error() :: rabbit_types:ok_or_error(any()). -type socket() :: port() | ssl:sslsocket(). +-type web_socket() :: #{ + %% @todo +}. -type opts() :: [{atom(), any()} | {raw, non_neg_integer(), non_neg_integer(), binary()}]. -type hostname() :: inet:hostname(). @@ -60,15 +63,16 @@ -spec send(socket(), iodata()) -> ok_or_any_error(). -spec close(socket()) -> ok_or_any_error(). -spec fast_close(socket()) -> ok_or_any_error(). --spec sockname(socket()) -> +-spec sockname(socket() | web_socket()) -> ok_val_or_error({inet:ip_address(), ip_port()}). --spec peername(socket()) -> +-spec peername(socket() | web_socket()) -> ok_val_or_error({inet:ip_address(), ip_port()}). --spec peercert(socket()) -> +-spec peercert(socket() | web_socket()) -> 'nossl' | ok_val_or_error(rabbit_cert_info:certificate()). --spec connection_string(socket(), 'inbound' | 'outbound') -> +-spec connection_string(socket() | web_socket(), 'inbound' | 'outbound') -> ok_val_or_error(string()). -% -spec socket_ends(socket() | ranch_proxy:proxy_socket() | ranch_proxy_ssl:ssl_socket(), +%% @todo Fix rather than comment? +% -spec socket_ends(socket() | web_socket()| ranch_proxy:proxy_socket() | ranch_proxy_ssl:ssl_socket(), % 'inbound' | 'outbound') -> % ok_val_or_error({host_or_ip(), ip_port(), % host_or_ip(), ip_port()}). @@ -203,13 +207,17 @@ fast_close(Sock) when is_port(Sock) -> catch port_close(Sock), ok. sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock); -sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). +sockname(Sock) when is_port(Sock) -> inet:sockname(Sock); +sockname(#{sock := Sock}) -> {ok, Sock}. peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock); -peername(Sock) when is_port(Sock) -> inet:peername(Sock). +peername(Sock) when is_port(Sock) -> inet:peername(Sock); +peername(#{peer := Peer}) -> {ok, Peer}. peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock); -peercert(Sock) when is_port(Sock) -> nossl. +peercert(Sock) when is_port(Sock) -> nossl; +peercert(#{cert := undefined}) -> nossl; +peercert(#{cert := Cert}) -> {ok, Cert}. connection_string(Sock, Direction) -> case socket_ends(Sock, Direction) of @@ -222,6 +230,7 @@ connection_string(Sock, Direction) -> Error end. +%% @todo This doesn't accept a TCP socket somehow? socket_ends(Sock, Direction) when ?IS_SSL(Sock); is_port(Sock) -> {From, To} = sock_funs(Direction), @@ -240,6 +249,7 @@ socket_ends({rabbit_proxy_socket, Sock, ProxyInfo}, Direction) -> #{command := local} -> socket_ends(Sock, Direction); %% PROXY header: use the IP/ports from the proxy header. + %% @todo This doesn't correctly take Direction into account? #{ src_address := FromAddress, src_port := FromPort, @@ -248,7 +258,24 @@ socket_ends({rabbit_proxy_socket, Sock, ProxyInfo}, Direction) -> } -> {ok, {rdns(FromAddress), FromPort, rdns(ToAddress), ToPort}} - end. + end; +%% @todo We don't take Direction into account. +socket_ends(#{proxy_header := #{command := proxy} = ProxyHeader}, _Direction) -> + #{ + src_address := FromAddress, + src_port := FromPort, + dest_address := ToAddress, + dest_port := ToPort + } = ProxyHeader, + {ok, {rdns(FromAddress), FromPort, + rdns(ToAddress), ToPort}}; +socket_ends(WebSocket, _Direction) -> + #{ + peer := {FromAddress, FromPort}, + sock := {ToAddress, ToPort} + } = WebSocket, + {ok, {rdns(FromAddress), FromPort, + rdns(ToAddress), ToPort}}. maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr); maybe_ntoab(Host) -> Host. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_keepalive.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_keepalive.erl index 6b7b94b54c23..66be10a24744 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_keepalive.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_keepalive.erl @@ -23,9 +23,13 @@ init() -> disabled. --spec start(IntervalSeconds :: non_neg_integer(), inet:socket()) -> ok. +-spec start(IntervalSeconds :: non_neg_integer(), inet:socket() | rabbit_net:web_socket()) -> ok. start(0, _Sock) -> ok; +%% Temporarily disable the keep-alive mechanism for WebSocket. +%% @todo Implement an alternative that doesn't require polling the socket. +start(_, #{} = _WebSocket) -> + ok; start(Seconds, Sock) when is_integer(Seconds) andalso Seconds > 0 -> self() ! {keepalive, {init, Seconds, Sock}}, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 45cfc3676091..718492b168d0 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -2288,6 +2288,11 @@ mailbox_soft_limit_exceeded() -> false end. +is_socket_busy(#{} = _WebSocket) -> + %% We cannot get socket stats for a WebSocket as + %% the real socket is a few layers below us when + %% using HTTP/2+. + false; is_socket_busy(Socket) -> case rabbit_net:getstat(Socket, [send_pend]) of {ok, [{send_pend, NumBytes}]} diff --git a/deps/rabbitmq_web_dispatch/src/rabbit_web.erl b/deps/rabbitmq_web_dispatch/src/rabbit_web.erl new file mode 100644 index 000000000000..b0379d9f9717 --- /dev/null +++ b/deps/rabbitmq_web_dispatch/src/rabbit_web.erl @@ -0,0 +1,217 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +%% @todo After all is done I would like to rename +%% rabbit_web_dispatch to simply rabbit_web. +%% That application will provide the core functionality +%% of all RabbitMQ Web applications. + +%% @todo It would be great to have a shared rabbit_web +%% listener. Since all Web applications depend on +%% rabbit_web, it would start first, and the +%% other applications would just update it with +%% additional routes and Websocket endpoints. +%% In these applications they would just specify +%% listener information as "rabbit_web". + +-module(rabbit_web). + +-export([start_listeners/3]). +-export([get_websocket_config_for_ref/1]). +-export([ws_route/1]). + +%% We create listeners in sets. From the configuration +%% we obtain a BaseRef (identifying the set) and a +%% number of different transports and transport options. +%% In addition we deduct the transport family by doing +%% a lookup. We may as a result create an IPv4 listener, +%% an IPv6 listener, or both in dual stack setups. We +%% go one step further: we group the TCP, TLS and QUIC +%% listeners to have a single start function for all +%% of them. This is because we want to automatically +%% populate Alt-Svc with those alternate services, so +%% that clients can choose the most appropriate +%% listener for their needs. This is also required +%% for browsers to discover QUIC endpoints- the only +%% alternative being the HTTPS DNS record. +%% +%% In the future we may want to add support for local +%% sockets. The name scheme allows for that. +%% +%% Should this model be applied to all listeners- and not +%% just Web listeners- then the following listeners could +%% be created: +%% +%% BaseRef :: amqp_legacy | amqp | mqtt | stomp | stream +%% | web_mqtt | web_stomp | management +%% Transport :: tcp | tls | quic +%% Num :: pos_integer() +%% Family :: inet | inet6 | local +%% Ref :: {BaseRef, Transport, Num, Family} +%% +%% There could be additional existing or new listeners +%% such as web_amqp, web_stream and more. +%% +%% The Num variable is there to allow configuring multiple +%% listeners for the same protocol. For example one could +%% have a listeners.tcp.1, listeners.tcp.2 and listeners.tcp.3 +%% in their configuration file. +%% +%% There is no need to distinguish between different +%% protocols that use the same transport; Cowboy doesn't +%% either. So if 'tls' is used then the protocol options +%% will be shared between HTTP/1.1 and HTTP/2 and the +%% appropriate protocol selected using ALPN. On the +%% other hand when transports differ so may protocol +%% configuration. One could imagine HTTP force-redirecting +%% to HTTPS as one example of use case. +%% +%% We also provide the ability to share the Websocket endpoint +%% between different plugins. Because Websocket is built to +%% handle multiple sub-protocols, and plugins typically +%% implement their own set, we can use a single Websocket +%% endpoint for all of them, if configured to do so. +%% +%% We provide information about the Websocket endpoint in +%% the protocol options and keep it around in a persistent +%% term. + +-type websocket_config() :: #{ + %% @todo Have an option that allows selecting one without a header? For STOMP. + %% Perhaps only accept a default if there is only 1 handler defined for a ref. + protocols := [binary()], + handler := module(), + opts := any() +}. + +-type config() :: #{ + %% The configuration must include at least one listener. + %% + %% The listener number is the position in the list. Take extra + %% care to match with the configuration entry (tcp.listener.1) + %% for easier debugging. + tcp | tls | quic := [rabbit_networking:listening_config()], + tcp_opts => todo, + tls_opts => todo, + quic_opts => todo, + %% For the time being we only support a single Websocket endpoint + %% per listener, and we expect that endpoint to be "/ws". If it later + %% becomes necessary to support more (why though?), we can either + %% extend the map or wrap the map in a list of endpoints. + websocket => websocket_config() +}. + +%% The type of the produced listener ref. + +-type ref() :: {any(), tcp | tls | quic, pos_integer(), inet | inet6}. + +%% -- + +-spec start_listeners(atom(), config(), map()) + -> {ok, [ref()]}. + +start_listeners(BaseRef, Transports, ProtoOpts0) -> + %% Add common rabbit_web components to protocol options. + ProtoOpts = prepare_proto_opts(ProtoOpts0), + %% In order to populate Alt-Svc and other alternative + %% services mechanisms, we must find whether these + %% alternative services are defined in the transports. + %% In general we want to advertise later versions of + %% HTTP in previous versions: for example HTTP/1.1 + %% advertises HTTP/2 and HTTP/3; while HTTP/2 can + %% advertise HTTP/3 only. So we start QUIC first. + {QuicRefs, QuicAltSvcs} = start_quic(BaseRef, Transports, ProtoOpts), + %% Then we start TLS as it may include HTTP/2. + {TLSRefs, TLSAltSvcs} = start_tls(BaseRef, Transports, ProtoOpts, + QuicAltSvcs), + %% Finally TCP. + TCPRefs = start_tcp(BaseRef, Transports, ProtoOpts, + QuicAltSvcs ++ TLSAltSvcs), + %% Next we want to keep the Websocket protocols supported. + store_websocket_info(BaseRef, Transports), + %% We return all refs. + {ok, TCPRefs ++ TLSRefs ++ QuicRefs}. + +prepare_proto_opts(Opts) -> + Opts#{ + %% @todo Middlewares? + %% @todo Access logs etc. Use metrics_h? + stream_handlers => [cowboy_stream_h], + enable_connect_protocol => true + }. + +%% @todo Implement QUIC. +start_quic(_, _, _) -> + {[], []}. + +%% @todo Implement TLS. +start_tls(_, _, _, _) -> + {[], []}. + +start_tcp(BaseRef, #{tcp := Listeners, tcp_opts := TransOpts}, ProtoOpts, []) -> + %% No alternative services. + start_tcp(BaseRef, Listeners, 1, TransOpts, ProtoOpts, []); +start_tcp(BaseRef, #{tcp := Listeners, tcp_opts := TransOpts}, ProtoOpts, AltSvcs) -> + start_tcp(BaseRef, Listeners, 1, TransOpts, ProtoOpts#{ + %% The Alt-Svc header will be set by our stream handler. + alternative_services => AltSvcs + }, []); +%% There were no TCP listeners. +start_tcp(_, _, _, _) -> + []. + +start_tcp(_, [], _, _, _, Acc) -> + lists:reverse(Acc); +start_tcp(BaseRef, [Listener|Tail], Num, TransOpts, ProtoOpts, Acc0) -> + Addresses = rabbit_networking:tcp_listener_addresses(Listener), + Acc = start_tcp_addrs(BaseRef, Addresses, Num, TransOpts, ProtoOpts, Acc0), + start_tcp(BaseRef, Tail, Num + 1, TransOpts, ProtoOpts, Acc). + +start_tcp_addrs(_, [], _, _, _, Acc) -> + Acc; +start_tcp_addrs(BaseRef, [Addr|Tail], Num, TransOpts, ProtoOpts, Acc) -> + {_Host, _Port, Family} = Addr, + Ref = {BaseRef, tcp, Num, Family}, + {ok, _} = cowboy:start_clear(Ref, TransOpts, ProtoOpts), + start_tcp_addrs(BaseRef, Tail, Num, TransOpts, ProtoOpts, [Ref|Acc]). + +store_websocket_info(BaseRef, #{websocket := WsConfig}) -> + %% @todo This should be done in a single process to avoid concurrency issues. + Terms = persistent_term:get(rabbit_websocket, []), + persistent_term:put(rabbit_websocket, [{BaseRef, WsConfig}|Terms]); +store_websocket_info(_, _) -> + ok. + +-spec get_websocket_config_for_ref(ref()) -> [websocket_config()]. + +get_websocket_config_for_ref({ThisBaseRef, _, _, _}) -> + Terms = persistent_term:get(rabbit_websocket, []), + [WsConfig || {BaseRef, WsConfig} <- Terms, BaseRef =:= ThisBaseRef]. + +%% It is occasionally necessary to update the list of routes +%% for Web listeners. For example a use case could be for +%% the Web-MQTT example application to add its routes to +%% the existing Web-MQTT listener. +%% +%% Because listeners can be configured in sets (due to +%% starting two listeners when using a dual IPv4/v6 stack), +%% and because the same listener isn't expected to serve +%% different routes on the same set of listeners, this +%% function takes only a BaseRef and applies the changes +%% to all the listeners it finds. + +%update_listener_routes(BaseRef, fun(Dispatch) -> Dispatch end) +% todo. + +%% We use a common module to initiate all Websocket connections. +%% This module will select the appropriate Websocket handler +%% based on the requested sub-protocol. + +-spec ws_route(Path) -> {Path, rabbit_web_ws_h, []}. + +ws_route(Path) -> + {Path, rabbit_web_ws_h, []}. diff --git a/deps/rabbitmq_web_dispatch/src/rabbit_web_ws_h.erl b/deps/rabbitmq_web_dispatch/src/rabbit_web_ws_h.erl new file mode 100644 index 000000000000..b4c82cb5cfdd --- /dev/null +++ b/deps/rabbitmq_web_dispatch/src/rabbit_web_ws_h.erl @@ -0,0 +1,100 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +%% The purpose of this module is to select a sub-protocol +%% and switch to the handler for the selected sub-protocol. +%% This module makes it possible for a single Websocket +%% endpoint to be shared between multiple plugins, for +%% example Web-MQTT and Web-STOMP. + +-module(rabbit_web_ws_h). +-behaviour(cowboy_handler). +-behaviour(cowboy_sub_protocol). + +%% HTTP handler. +-export([init/2]). + +%% Cowboy sub-protocol. +-export([upgrade/4]). +-export([upgrade/5]). +-export([takeover/7]). + +-include_lib("kernel/include/logger.hrl"). + +%% HTTP handler. + +init(Req = #{ref := Ref}, _) -> + case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of + undefined -> + no_supported_sub_protocol(undefined, Req); + ReqProtocols -> + Config = rabbit_web:get_websocket_config_for_ref(Ref), + ?LOG_ERROR("Config: ~tp", [Config]), + case select_sub_protocol(ReqProtocols, Config) of + false -> + no_supported_sub_protocol(ReqProtocols, Req); + {Proto, Handler, WsOpts} -> + %% We call the handler's init function to initialise + %% its state. The module returned MUST be this module. + %% @todo What do we give as initial state? + {?MODULE, Req1, HandlerState} = Handler:init(Req, []), + %% We must use the Cowboy protocol interface to + %% switch to Websocket using a new handler module. + Req2 = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, Proto, Req1), + {?MODULE, Req2, {Handler, HandlerState}, WsOpts} + end + end. + +select_sub_protocol([], _) -> + false; +select_sub_protocol([Proto|Tail], Config) -> + case select_sub_protocol1(Proto, Config) of + false -> + select_sub_protocol(Tail, Config); + Selected -> + Selected + end. + +select_sub_protocol1(_, []) -> + false; +select_sub_protocol1(Proto, [WsConfig|Tail]) -> + #{ + protocols := Protocols, + handler := Handler, + opts := WsOpts + } = WsConfig, + case lists:member(Proto, Protocols) of + true -> + {Proto, Handler, WsOpts}; + false -> + select_sub_protocol1(Proto, Tail) + end. + +no_supported_sub_protocol(ReqProtocols, Req) -> + %% @todo Figure out what to say in the error message. + %% @todo Probably pass an option with a listener name as string or something. + ?LOG_ERROR("Web: ~tp", [ReqProtocols]), + %% The client MUST include “mqtt” in the list of WebSocket Sub Protocols it offers [MQTT-6.0.0-3]. +% ?LOG_ERROR("Web MQTT: 'mqtt' not included in client offered subprotocols: ~tp", [Protocol]), + {ok, + %% @todo Connection: close is invalid for HTTP/2. Just don't close? + cowboy_req:reply(400, #{<<"connection">> => <<"close">>}, Req), + undefined}. + +%% Cowboy sub-protocol. + +%% cowboy_sub_protcol +upgrade(Req, Env, Handler, HandlerState) -> + upgrade(Req, Env, Handler, HandlerState, #{}). + +%% We set the Handler to switch to in our HandlerState. +%% Now is the time to switch handlers. +upgrade(Req, Env, ?MODULE, {Handler, HandlerState}, Opts) -> + cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts). + +takeover(Parent, Ref, Socket, Transport, Opts, Buffer, HandlerInfo) -> + cowboy_websocket:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, HandlerInfo). diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl index fc6424ffae4f..49b6ca97a9cf 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl @@ -42,6 +42,7 @@ prep_stop(State) -> -spec stop(_) -> ok. stop(_State) -> + %% @todo Do the rabbit_web equivalent. _ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL), _ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL), ok. @@ -78,20 +79,46 @@ emit_connection_info(Items, Ref, AggregatorPid, Pids) -> %% mqtt_init() -> - CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])), - CowboyWsOpts = maps:from_list(get_env(cowboy_ws_opts, [])), + CowboyWsOpts0 = maps:from_list(get_env(cowboy_ws_opts, [])), + CowboyWsOpts = CowboyWsOpts0#{compress => true}, TcpConfig = get_env(tcp_config, []), - SslConfig = get_env(ssl_config, []), +% SslConfig = get_env(ssl_config, []), Routes = cowboy_router:compile([{'_', [ - {get_env(ws_path, "/ws"), rabbit_web_mqtt_handler, [{ws_opts, CowboyWsOpts}]} + rabbit_web:ws_route(get_env(ws_path, "/ws")), + {"/web-mqtt-examples/[...]", cowboy_static, {priv_dir, rabbitmq_web_mqtt_examples, "", []}} ]}]), + CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])), CowboyOpts = CowboyOpts0#{ - env => #{dispatch => Routes}, - proxy_header => get_env(proxy_protocol, false), - stream_handlers => [rabbit_web_mqtt_stream_handler, cowboy_stream_h] - }, - start_tcp_listener(TcpConfig, CowboyOpts), - start_tls_listener(SslConfig, CowboyOpts). + env => #{dispatch => Routes}, + proxy_header => get_env(proxy_protocol, false) + }, + rabbit_web:start_listeners(rabbit_web_mqtt, #{ + tcp => [listener_config(TcpConfig)], + tcp_opts => #{ + socket_opts => TcpConfig, + max_connections => get_max_connections(), + num_acceptors => get_env(num_tcp_acceptors, 10), + num_conns_sups => get_env(num_conns_sup, 1) + }, + websocket => #{ + protocols => [<<"mqtt">>], + handler => rabbit_web_mqtt_handler, + opts => CowboyWsOpts + } + }, CowboyOpts). + +listener_config(Config) -> + Port = proplists:get_value(port, Config, 15675), + case proplists:get_value(ip, Config) of + undefined -> + Port; + IpStr -> + {normalize_ip(IpStr), Port} + end. + + +% start_tcp_listener(TcpConfig, CowboyOpts), +% start_tls_listener(SslConfig, CowboyOpts). start_tcp_listener([], _) -> ok; start_tcp_listener(TCPConf0, CowboyOpts) -> diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index d336fb6a3e29..0fe0b3b42243 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -7,7 +7,6 @@ -module(rabbit_web_mqtt_handler). -behaviour(cowboy_websocket). --behaviour(cowboy_sub_protocol). -include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/logging.hrl"). @@ -25,15 +24,10 @@ -export([conserve_resources/3]). -export([info/2]). -%% cowboy_sub_protocol --export([upgrade/4, - upgrade/5, - takeover/7]). - -define(APP, rabbitmq_web_mqtt). -record(state, { - socket :: {rabbit_proxy_socket, any(), any()} | rabbit_net:socket(), + socket :: rabbit_net:web_socket(), parse_state = rabbit_mqtt_packet:init_state() :: rabbit_mqtt_packet:state(), proc_state = connect_packet_unprocessed :: connect_packet_unprocessed | rabbit_mqtt_processor:state(), @@ -53,47 +47,24 @@ -define(CLOSE_PROTOCOL_ERROR, 1002). -define(CLOSE_UNACCEPTABLE_DATA_TYPE, 1003). -%% cowboy_sub_protcol -upgrade(Req, Env, Handler, HandlerState) -> - upgrade(Req, Env, Handler, HandlerState, #{}). - -upgrade(Req, Env, Handler, HandlerState, Opts) -> - cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts). - -takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) -> - Sock = case HandlerState#state.socket of - undefined -> - Socket; - ProxyInfo -> - {rabbit_proxy_socket, Socket, ProxyInfo} - end, - cowboy_websocket:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, - {Handler, HandlerState#state{socket = Sock}}). - %% cowboy_websocket -init(Req, Opts) -> - case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of - undefined -> - no_supported_sub_protocol(undefined, Req); - Protocol -> - case lists:member(<<"mqtt">>, Protocol) of - false -> - no_supported_sub_protocol(Protocol, Req); - true -> - WsOpts0 = proplists:get_value(ws_opts, Opts, #{}), - WsOpts = maps:merge(#{compress => true}, WsOpts0), - ShouldUseFHC = application:get_env(?APP, use_file_handle_cache, true), - case ShouldUseFHC of - true -> ?LOG_INFO("Web MQTT: file handle cache use is enabled"); - false -> ?LOG_INFO("Web MQTT: file handle cache use is disabled") - end, - - {?MODULE, - cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req), - #state{socket = maps:get(proxy_header, Req, undefined), should_use_fhc = ShouldUseFHC}, - WsOpts} - end - end. +init(Req, _) -> + ?LOG_ERROR("Web MQTT: ~p", [Req]), + ShouldUseFHC = application:get_env(?APP, use_file_handle_cache, true), + case ShouldUseFHC of + true -> ?LOG_INFO("Web MQTT: file handle cache use is enabled"); + false -> ?LOG_INFO("Web MQTT: file handle cache use is disabled") + end, + State = #state{ + socket = #{ + peer => maps:get(peer, Req), + sock => maps:get(sock, Req), + cert => maps:get(cert, Req), + proxy_header => maps:get(proxy_header, Req, undefined) + }, + should_use_fhc = ShouldUseFHC + }, + {rabbit_web_ws_h, Req, State}. %% We cannot use a gen_server call, because the handler process is a %% special cowboy_websocket process (not a gen_server) which assumes diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 9dd90b0e03e6..8e0c5f59eda7 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -113,8 +113,8 @@ dep_toke = git_rmq toke $(current_rmq_ref # possible to work with rabbitmq-public-umbrella. dep_accept = hex 0.3.5 -dep_cowboy = hex 2.12.0 -dep_cowlib = hex 2.13.0 +dep_cowboy = git https://github.com/ninenines/cowboy master +dep_cowlib = git https://github.com/ninenines/cowlib master dep_credentials_obfuscation = hex 3.4.0 dep_khepri = hex 0.12.1 dep_khepri_mnesia_migration = hex 0.4.0 From 2543cbbec3af2f211377d58f37df03ce49768d52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 11 Apr 2024 16:29:28 +0200 Subject: [PATCH 2/2] Convert Web-MQTT-Examples to rabbit_web --- .../src/rabbit_mqtt_keepalive.erl | 3 ++ deps/rabbitmq_web_dispatch/src/rabbit_web.erl | 37 +++++++++++++++++-- .../src/rabbit_web_mqtt_app.erl | 3 +- .../src/rabbit_web_mqtt_examples_app.erl | 9 +++-- 4 files changed, 43 insertions(+), 9 deletions(-) diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_keepalive.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_keepalive.erl index 66be10a24744..82290d2e0784 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_keepalive.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_keepalive.erl @@ -28,6 +28,9 @@ start(0, _Sock) -> ok; %% Temporarily disable the keep-alive mechanism for WebSocket. %% @todo Implement an alternative that doesn't require polling the socket. +%% That can be done with the usual timers on receiving data +%% in rabbit_mqtt_processor:process_packet/2. The change should +%% be benchmarked against the previous mechanism as well. start(_, #{} = _WebSocket) -> ok; start(Seconds, Sock) diff --git a/deps/rabbitmq_web_dispatch/src/rabbit_web.erl b/deps/rabbitmq_web_dispatch/src/rabbit_web.erl index b0379d9f9717..592b09bf0cb5 100644 --- a/deps/rabbitmq_web_dispatch/src/rabbit_web.erl +++ b/deps/rabbitmq_web_dispatch/src/rabbit_web.erl @@ -22,6 +22,7 @@ -export([start_listeners/3]). -export([get_websocket_config_for_ref/1]). +-export([add_routes_to_listeners/2]). -export([ws_route/1]). %% We create listeners in sets. From the configuration @@ -195,7 +196,8 @@ get_websocket_config_for_ref({ThisBaseRef, _, _, _}) -> %% It is occasionally necessary to update the list of routes %% for Web listeners. For example a use case could be for %% the Web-MQTT example application to add its routes to -%% the existing Web-MQTT listener. +%% the existing Web-MQTT listener. Another use case is +%% a shared Web listener for all Web applications. %% %% Because listeners can be configured in sets (due to %% starting two listeners when using a dual IPv4/v6 stack), @@ -203,9 +205,38 @@ get_websocket_config_for_ref({ThisBaseRef, _, _, _}) -> %% different routes on the same set of listeners, this %% function takes only a BaseRef and applies the changes %% to all the listeners it finds. +%% +%% Routes are expected to be unique and conflict-free. +%% They are added at the end of the existing routes. +%% An exception is made for the Websocket endpoint: +%% there is no need to ensure the route is unique +%% when using the ws_route/1 function. + +add_routes_to_listeners(BaseRef, AddedDispatch) -> + %% First we must get the list of listeners. + %% We query the ranch_server table directly. + %% Since we are already querying ranch_server, + %% we retrieve the protocol options at the same time. + Listeners = ets:select(ranch_server, [ + {{{proto_opts, '$1'}, '$2'}, [{'==', BaseRef, {element, 1, '$1'}}], [{{'$1', '$2'}}]} + ]), + %% Then we go over each listener, updating the + %% routes one at a time. + lists:foreach(fun({Ref, ProtoOpts0}) -> + #{env := Env = #{dispatch := Dispatch0}} = ProtoOpts0, + Dispatch = merge_dispatches(Dispatch0, AddedDispatch), + ProtoOpts = ProtoOpts0#{env => Env#{dispatch => Dispatch}}, + ranch:set_protocol_options(Ref, ProtoOpts) + end, Listeners), + ok. + +%% We currently expect the dispatch lists to use '_' +%% for the host part of the dispatch. This code will +%% need to be updated should we ever specify anything +%% other than '_' for the host. -%update_listener_routes(BaseRef, fun(Dispatch) -> Dispatch end) -% todo. +merge_dispatches([{'_', [], Paths1}], [{'_', [], Paths2}]) -> + [{'_', [], Paths1 ++ Paths2}]. %% We use a common module to initiate all Websocket connections. %% This module will select the appropriate Websocket handler diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl index 49b6ca97a9cf..14ecf7c62dcd 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_app.erl @@ -84,8 +84,7 @@ mqtt_init() -> TcpConfig = get_env(tcp_config, []), % SslConfig = get_env(ssl_config, []), Routes = cowboy_router:compile([{'_', [ - rabbit_web:ws_route(get_env(ws_path, "/ws")), - {"/web-mqtt-examples/[...]", cowboy_static, {priv_dir, rabbitmq_web_mqtt_examples, "", []}} + rabbit_web:ws_route(get_env(ws_path, "/ws")) ]}]), CowboyOpts0 = maps:from_list(get_env(cowboy_opts, [])), CowboyOpts = CowboyOpts0#{ diff --git a/deps/rabbitmq_web_mqtt_examples/src/rabbit_web_mqtt_examples_app.erl b/deps/rabbitmq_web_mqtt_examples/src/rabbit_web_mqtt_examples_app.erl index 3eca741d98a4..33d3e511f358 100644 --- a/deps/rabbitmq_web_mqtt_examples/src/rabbit_web_mqtt_examples_app.erl +++ b/deps/rabbitmq_web_mqtt_examples/src/rabbit_web_mqtt_examples_app.erl @@ -16,13 +16,14 @@ -export([init/1]). start(_Type, _StartArgs) -> - {ok, Listener} = application:get_env(rabbitmq_web_mqtt_examples, listener), - {ok, _} = rabbit_web_dispatch:register_static_context( - web_mqtt_examples, Listener, "web-mqtt-examples", ?MODULE, - "priv", "WEB-MQTT: examples"), + Routes = cowboy_router:compile([{'_', [ + {"/web-mqtt-examples/[...]", cowboy_static, {priv_dir, rabbitmq_web_mqtt_examples, "", []}} + ]}]), + rabbit_web:add_routes_to_listeners(rabbit_web_mqtt, Routes), supervisor:start_link({local, ?MODULE}, ?MODULE, []). stop(_State) -> + %% @todo Remove routes. rabbit_web_dispatch:unregister_context(web_mqtt_examples), ok.