Skip to content

Commit

Permalink
WIP: OF-2559 Added TLS handler for inbound netty connections
Browse files Browse the repository at this point in the history
  • Loading branch information
viv committed Jul 3, 2023
1 parent 85ca924 commit adbf081
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,18 @@

package org.jivesoftware.openfire.nio;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.compression.JZlibDecoder;
import io.netty.handler.codec.compression.JZlibEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.mina.core.buffer.IoBuffer;
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.ConnectionCloseListener;
import org.jivesoftware.openfire.PacketDeliverer;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.StanzaHandler;
import org.jivesoftware.openfire.net.StartTlsFilter;
import org.jivesoftware.openfire.session.LocalSession;
import org.jivesoftware.openfire.session.Session;
import org.jivesoftware.openfire.spi.ConnectionConfiguration;
Expand All @@ -44,7 +39,6 @@

import javax.annotation.Nullable;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -60,7 +54,6 @@
import java.util.concurrent.locks.ReentrantLock;

import static com.jcraft.jzlib.JZlib.Z_BEST_COMPRESSION;
import static org.jivesoftware.openfire.spi.ConnectionManagerImpl.*;

/**
* Implementation of {@link Connection} interface specific for Netty connections.
Expand Down Expand Up @@ -398,18 +391,21 @@ public void deliverRawText(String text) {
public void startTLS(boolean clientMode, boolean directTLS) throws Exception {

final EncryptionArtifactFactory factory = new EncryptionArtifactFactory( configuration );
// TODO implement ssl filter
SslContext sslContext = factory.createSslContext();

// final SslFilter filter;
final SslHandler sslHandler = sslContext.newHandler(channelHandlerContext.alloc());
if ( clientMode ) {
// filter = factory.createClientModeSslFilter();
} else {
// filter = factory.createServerModeSslFilter();
}

// ioSession.getFilterChain().addBefore(EXECUTOR_FILTER_NAME, TLS_FILTER_NAME, filter);
channelHandlerContext.pipeline().addFirst(sslHandler);

if (!directTLS) {
// ioSession.getFilterChain().addAfter(TLS_FILTER_NAME, STARTTLS_FILTER_NAME, new StartTlsFilter());
// channelHandlerContext.pipeline().addFirst(factory.createSslHandler(channelHandlerContext.alloc()));
}

if ( !clientMode && !directTLS ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ public void handlerRemoved(ChannelHandlerContext ctx) {

@Override
public void channelRead0(ChannelHandlerContext ctx, String message) {
// org.jivesoftware.openfire.nio.ConnectionHandler.messageReceived

// Get the parser to use to process stanza. For optimization there is going
// to be a parser for each running thread. Each Filter will be executed
// by the Executor placed as the first Filter. So we can have a parser associated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public static final class Client {
public static final String LOGIN_ANONYM_ALLOWED = "xmpp.client.login.allowedAnonym";

public static final String MAX_THREADS = "xmpp.client.processing.threads";

/**
* Used to configure throttling at the network level
*
* @deprecated not implemented for Netty as this uses max of 1 message per read instead
*/
public static final String MAX_READ_BUFFER = "xmpp.client.maxReadBufferSize";

public static final String MAX_THREADS_SSL = "xmpp.client_ssl.processing.threads";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.jivesoftware.openfire.spi;

import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.apache.mina.filter.ssl.SslFilter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.jivesoftware.openfire.keystore.OpenfireX509TrustManager;
Expand Down Expand Up @@ -324,6 +327,29 @@ public synchronized SslContextFactory getSslContextFactory()
}
}

public SslContext createSslContext() throws UnrecoverableKeyException, NoSuchAlgorithmException, KeyStoreException, SSLException {
getKeyManagers();
SslContextBuilder builder = SslContextBuilder.forServer(keyManagerFactory);

// Set policy for checking client certificates.
switch ( configuration.getClientAuth() )
{
case disabled:
builder.clientAuth(ClientAuth.NONE);
break;
case wanted:
builder.clientAuth(ClientAuth.OPTIONAL);
break;
case needed:
builder.clientAuth(ClientAuth.REQUIRE);
break;
}

builder.startTls(true);

return builder.build();
}

/**
* Creates an Apache MINA SslFilter that is configured to use server mode when handshaking.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,19 @@

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jivesoftware.openfire.Connection;
import org.jivesoftware.openfire.nio.NettyClientConnectionHandler;
import org.jivesoftware.openfire.nio.NettyConnectionHandler;
import org.jivesoftware.openfire.nio.NettyServerConnectionHandler;
import org.jivesoftware.openfire.nio.NettyXMPPDecoder;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.SystemProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

/**
* This class is responsible for accepting new (socket) connections, using Java NIO implementation provided by the
Expand All @@ -35,25 +25,25 @@
*/
class NettyConnectionAcceptor extends ConnectionAcceptor {
// NioEventLoopGroup is a multithreaded event loop that handles I/O operation.
// Netty provides various EventLoopGroup implementations for different kind of transports.
// We are implementing a server-side application in this example, and therefore two
// NioEventLoopGroup will be used. The first one, often called 'boss', accepts an incoming connection.
// The first one, often called 'boss', accepts an incoming connection.
// The second one, often called 'worker', handles the traffic of the accepted connection once the boss
// accepts the connection and registers the accepted connection to the worker. How many Threads are
// used and how they are mapped to the created Channels depends on the EventLoopGroup implementation
// and may be even configurable via a constructor.

/**
* Controls the write timeout time in seconds to handle stalled sessions and prevent DoS
* A multithreaded event loop that handles I/O operation
* <p>
* The 'boss' accepts an incoming connection.
*/
public static final SystemProperty<Duration> WRITE_TIMEOUT_SECONDS = SystemProperty.Builder.ofType(Duration.class)
.setKey("xmpp.socket.write-timeout-seconds")
.setDefaultValue(Duration.ofSeconds(30))
.setChronoUnit(ChronoUnit.SECONDS)
.setDynamic(true)
.build();

private static final EventLoopGroup BOSS_GROUP = new NioEventLoopGroup();

/**
* A multithreaded event loop that handles I/O operation
* <p>
* The 'worker', handles the traffic of the accepted connection once the boss accepts the connection
* and registers the accepted connection to the worker.
*/
private static final EventLoopGroup WORKER_GROUP = new NioEventLoopGroup();
private final Logger Log;
private final NettyConnectionHandler connectionHandler;
Expand Down Expand Up @@ -99,45 +89,6 @@ public NettyConnectionAcceptor(ConnectionConfiguration configuration) {
// }
}

private static NioSocketAcceptor buildSocketAcceptor() {

// TODO consider configuring netty with the settings below (i.e. find the netty way of doing this)

// Create SocketAcceptor with correct number of processors
final int processorCount = JiveGlobals.getIntProperty("xmpp.processor.count", Runtime.getRuntime().availableProcessors());

final NioSocketAcceptor socketAcceptor = new NioSocketAcceptor(processorCount);

// Set that it will be possible to bind a socket if there is a connection in the timeout state.
socketAcceptor.setReuseAddress(true);

// Set the listen backlog (queue) length. Default is 50.
socketAcceptor.setBacklog(JiveGlobals.getIntProperty("xmpp.socket.backlog", 50));

// Set default (low level) settings for new socket connections
final SocketSessionConfig socketSessionConfig = socketAcceptor.getSessionConfig();

//socketSessionConfig.setKeepAlive();
final int receiveBuffer = JiveGlobals.getIntProperty("xmpp.socket.buffer.receive", -1);
if (receiveBuffer > 0) {
socketSessionConfig.setReceiveBufferSize(receiveBuffer);
}

final int sendBuffer = JiveGlobals.getIntProperty("xmpp.socket.buffer.send", -1);
if (sendBuffer > 0) {
socketSessionConfig.setSendBufferSize(sendBuffer);
}

final int linger = JiveGlobals.getIntProperty("xmpp.socket.linger", -1);
if (linger > 0) {
socketSessionConfig.setSoLinger(linger);
}

socketSessionConfig.setTcpNoDelay(JiveGlobals.getBooleanProperty("xmpp.socket.tcp-nodelay", socketSessionConfig.isTcpNoDelay()));

return socketAcceptor;
}

/**
* Starts this acceptor by binding the socket acceptor. When the acceptor is already started, a warning will be
* logged and the method invocation is otherwise ignored.
Expand All @@ -147,42 +98,23 @@ public synchronized void start() {
System.out.println("Running Netty on port: " + getPort());

try {
// ServerBootstrap is a helper class that sets up a server. You can set up the server using
// a Channel directly. However, please note that this is a tedious process, and you do not
// need to do that in most cases.
// ServerBootstrap is a helper class that sets up a server
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(BOSS_GROUP, WORKER_GROUP)
// Here, we specify to use the NioServerSocketChannel class which is used to
// instantiate a new Channel to accept incoming connections.
// Instantiate a new Channel to accept incoming connections.
.channel(NioServerSocketChannel.class)
// The handler specified here will always be evaluated by a newly accepted Channel.
// The ChannelInitializer is a special handler that is purposed to help a user configure
// a new Channel. It is most likely that you want to configure the ChannelPipeline of the
// new Channel by adding some handlers such as DiscardServerHandler to implement your
// network application. As the application gets complicated, it is likely that you will add
// more handlers to the pipeline and extract this anonymous class into a top-level
// class eventually.
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyXMPPDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast("stalledSessionHandler", new WriteTimeoutHandler((int)WRITE_TIMEOUT_SECONDS.getValue().getSeconds()));
ch.pipeline().addLast(connectionHandler);
}
})
// You can also set the parameters which are specific to the Channel implementation.
// We are writing a TCP/IP server, so we are allowed to set the socket options such as
// tcpNoDelay and keepAlive. Please refer to the apidocs of ChannelOption and the specific
// ChannelConfig implementations to get an overview about the supported ChannelOptions.

.childHandler(new NettyServerInitializer(connectionHandler))
// Set the listen backlog (queue) length.
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_BACKLOG, JiveGlobals.getIntProperty("xmpp.socket.backlog", 50))
// option() is for the NioServerSocketChannel that accepts incoming connections.
// childOption() is for the Channels accepted by the parent ServerChannel,
// which is NioSocketChannel in this case.
.childOption(ChannelOption.SO_KEEPALIVE, true);

.childOption(ChannelOption.SO_KEEPALIVE, true)
// Setting TCP_NODELAY to false enables the Nagle algorithm, which delays sending small successive packets
.childOption(ChannelOption.TCP_NODELAY, JiveGlobals.getBooleanProperty( "xmpp.socket.tcp-nodelay", true))
// Set that it will be possible to bind a socket if there is a connection in the timeout state.
.childOption(ChannelOption.SO_REUSEADDR, true);

final int sendBuffer = JiveGlobals.getIntProperty( "xmpp.socket.buffer.send", -1 );
if ( sendBuffer > 0 ) {
Expand All @@ -199,27 +131,16 @@ public void initChannel(SocketChannel ch) throws Exception {
serverBootstrap.childOption(ChannelOption.SO_LINGER, linger);
}

serverBootstrap.childOption(ChannelOption.TCP_NODELAY, JiveGlobals.getBooleanProperty( "xmpp.socket.tcp-nodelay", true));

// Set that it will be possible to bind a socket if there is a connection in the timeout state.
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);

// We do not need to throttle sessions for Netty
if ( configuration.getMaxBufferSize() > 0 ) {
Log.warn( "Throttling by using max buffer size not implemented for Netty; a maximum of 1 message per read is implemented instead.");
}


// Bind to the port and start the server to accept incoming connections.
// You can now call the bind() method as many times as you want (with different bind addresses.)
this.mainChannel = serverBootstrap.bind(
new InetSocketAddress(configuration.getBindAddress(),
configuration.getPort()))
new InetSocketAddress(
configuration.getBindAddress(),
configuration.getPort())
)
.sync()
.channel();

} catch (InterruptedException e) {
System.err.println("Error starting " + configuration.getPort() + ": " + e.getMessage());
Log.error("Error starting: " + configuration.getPort(), e);
closeMainChannel();
}
Expand All @@ -230,10 +151,19 @@ public void initChannel(SocketChannel ch) throws Exception {
*/
@Override
public synchronized void stop() {
System.out.println("Stop called for port: " + getPort());
closeMainChannel();
}

/**
* Close the main channel (this is not synchronous and does not verify the channel has closed).
*/
private void closeMainChannel() {
if (this.mainChannel != null) {
Log.info("Closing channel " + mainChannel);
mainChannel.close();
}
}

/**
* Shuts down event loop groups if they are not already shutdown - this will close all channels.
*/
Expand Down Expand Up @@ -263,16 +193,6 @@ public synchronized void reconfigure(ConnectionConfiguration configuration) {
// TODO reconfigure the netty connection
}

/**
* Close the main channel (this is not synchronous and does not verify the channel has closed).
*/
public void closeMainChannel() {
if (this.mainChannel != null) {
Log.info("Closing channel " + mainChannel);
mainChannel.close();
}
}

public synchronized int getPort() {
return configuration.getPort();
}
Expand Down
Loading

0 comments on commit adbf081

Please sign in to comment.