Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OF-1112: Switch to MINA for inbound S2S #2159

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions i18n/src/main/resources/openfire_i18n.properties
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,7 @@ system_property.xmpp.auth.ssl.context_protocol=The TLS protocol to use for encry
system_property.xmpp.socket.ssl.active=Set to true to enable legacy encrypted connections for clients, otherwise false
system_property.xmpp.component.ssl.active=Set to true to enable legacy encrypted connections for external components, otherwise false
system_property.xmpp.server.startup.retry.delay=Set to a positive value to allow a retry of a failed startup after the specified duration.
system_property.xmpp.server.enable-old-io=When true, switches Openfire back to using the legacy network IO implementation for server-to-server communication.
system_property.sasl.realm=The realm used for SASL authentication, which can be used when realms that are passed through SASL are different from the XMPP domain name.
system_property.sasl.approvedRealms=A collection of realm names that can be used for SASL authentication. This can be used when realms that are passed through SASL are different from the XMPP domain name.
system_property.sasl.proxyAuth=Controls if Openfire's default authorization policy allows authentication identities (identity whose password will be used) that are different from authorization identities (identity to act as).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.channels.AsynchronousCloseException;

import org.dom4j.Element;
import org.dom4j.QName;
import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,7 +41,9 @@
* sequentially.
*
* @author Gaston Dombiak
* @deprecated Old, pre NIO / MINA code. Should not be used as NIO offers better performance
*/
@Deprecated
class BlockingReadingMode extends SocketReadingMode {

private static final Logger Log = LoggerFactory.getLogger(BlockingReadingMode.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
* by changing the property <b>xmpp.server.processing.max.threads</b>.
*
* @author Gaston Dombiak
* @deprecated Old, pre NIO / MINA code. Should not be used as NIO offers better performance. Currently only in use for s2s.
*/
@Deprecated
public class ServerSocketReader extends SocketReader {

private static final Logger Log = LoggerFactory.getLogger(ServerSocketReader.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
* The connection used for receiving packets will use a ServerStanzaHandler since the other
* connection will not receive packets.<p>
*
* TODO Finish migration of s2s to use NIO instead of blocking threads. Migrate from ServerSocketReader.
*
* @author Gaston Dombiak
*/
public class ServerStanzaHandler extends StanzaHandler {
Expand All @@ -64,6 +62,7 @@ public ServerStanzaHandler(PacketRouter router, Connection connection) {

@Override
boolean processUnknowPacket(Element doc) throws UnauthorizedException {
Log.trace("Processing 'unknown' packet");
// Handle subsequent db:result packets
if ("db".equals(doc.getNamespacePrefix()) && "result".equals(doc.getName())) {
if (!((LocalIncomingServerSession) session).validateSubsequentDomain(doc)) {
Expand Down Expand Up @@ -98,12 +97,11 @@ boolean validateJIDs() {
@Override
boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection)
throws XmlPullParserException {
// TODO Finish implementation
/*if ("jabber:server".equals(namespace)) {
if ("jabber:server".equals(namespace)) {
// The connected client is a server so create an IncomingServerSession
session = LocalIncomingServerSession.createSession(serverName, reader, connection);
session = LocalIncomingServerSession.createSession(serverName, xpp, connection);
return true;
}*/
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
* stream element and will then keep reading and routing the received packets.
*
* @author Gaston Dombiak
* @deprecated Old, pre NIO / MINA code. Should not be used as NIO offers better performance
*/
@Deprecated
public abstract class SocketReader implements Runnable {

private static final Logger Log = LoggerFactory.getLogger(SocketReader.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
* Abstract class for {@link BlockingReadingMode}.
*
* @author Gaston Dombiak
* @deprecated Old, pre NIO / MINA code. Should not be used as NIO offers better performance
*/
@Deprecated
abstract class SocketReadingMode {

private static final Logger Log = LoggerFactory.getLogger(SocketReadingMode.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,36 +113,45 @@ public void setSession(LocalSession session) {
}

public void process(String stanza, XMPPPacketReader reader) throws Exception {
Log.trace("Processing: {}", stanza);
boolean initialStream = stanza.startsWith("<stream:stream");
if (!sessionCreated || initialStream) {
if (!initialStream) {
// Ignore <?xml version="1.0"?>
Log.trace("Ignore <?xml version=\"1.0\"?>");
return;
}
// Found an stream:stream tag...
Log.trace("Found an stream:stream tag...");
if (!sessionCreated) {
Log.trace("Creating session...");
sessionCreated = true;
MXParser parser = reader.getXPPParser();
parser.setInput(new StringReader(stanza));
createSession(parser);
}
else if (startedTLS) {
Log.trace("TLS established...");
startedTLS = false;
tlsNegotiated();
}
else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {
Log.trace("SASL established...");
startedSASL = false;
saslSuccessful();
}
else if (waitingCompressionACK) {
Log.trace("Compression established...");
waitingCompressionACK = false;
compressionSuccessful();
}
else {
Log.trace("Ignoring stanza.");
}
return;
}

// Verify if end of stream was requested
if (stanza.equals("</stream:stream>")) {
Log.trace("End of stream was requested. Formally closing a session, if it is still open. Session not null? {}", session != null);
if (session != null) {
session.getStreamManager().formalClose();
Log.debug( "Closing session as an end-of-stream was received: {}", session );
Expand All @@ -152,6 +161,7 @@ else if (waitingCompressionACK) {
}
// Ignore <?xml version="1.0"?> stanzas sent by clients
if (stanza.startsWith("<?xml")) {
Log.trace("Ignore <?xml version=\"1.0\"?> stanzas sent by clients");
return;
}

Expand All @@ -177,40 +187,43 @@ else if (waitingCompressionACK) {
}

if (doc == null) {
// No document found.
Log.trace("No document found.");
return;
}
String tag = doc.getName();
if ("starttls".equals(tag)) {
// Negotiate TLS
Log.trace("Negotiate TLS...");
if (negotiateTLS()) {
Log.trace("Started TLS negotiation.");
startedTLS = true;
}
else {
Log.trace("failure to start TLS negotiation.");
connection.close();
session = null;
}
}
else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL
Log.trace("User is trying to authenticate using SASL. Process authentication stanza.");
startedSASL = true;
// Process authentication stanza
saslStatus = SASLAuthentication.handle(session, doc);
} else if (startedSASL && "response".equals(tag) || "abort".equals(tag)) {
// User is responding to SASL challenge. Process response
Log.trace("User is responding to SASL challenge. Process response");
saslStatus = SASLAuthentication.handle(session, doc);
}
else if ("compress".equals(tag)) {
// Client is trying to initiate compression
Log.trace("Client is trying to initiate compression");
if (compressClient(doc)) {
// Compression was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
waitingCompressionACK = true;
}
} else if (isStreamManagementStanza(doc)) {
Log.trace("Client is sending stream management stasnza.");
session.getStreamManager().process( doc );
}
else {
Log.trace("Delegate processing of stanza to process()");
process(doc);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public static final class Client {

public static final class Server {

public static final SystemProperty<Boolean> USE_OLD_IO = SystemProperty.Builder.ofType(Boolean.class)
.setKey("xmpp.server.enable-old-io")
.setDefaultValue(false)
.setDynamic(false)
.build();

public static final String SOCKET_ACTIVE = "xmpp.server.socket.active";
public static final String PORT = "xmpp.server.socket.port";
public static final String OLD_SSLPORT = "xmpp.server.socket.ssl.port";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.jivesoftware.openfire.spi.ConnectionType;
import org.jivesoftware.util.CertificateManager;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.LocaleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.v1.XmlPullParser;
Expand All @@ -36,6 +37,7 @@
import org.xmpp.packet.Packet;
import org.xmpp.packet.StreamError;

import javax.annotation.Nonnull;
import java.io.IOException;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
Expand Down Expand Up @@ -70,6 +72,8 @@ public class LocalIncomingServerSession extends LocalServerSession implements In

private static final Logger Log = LoggerFactory.getLogger(LocalIncomingServerSession.class);

private static final String ETHERX_NAMESPACE = "http://etherx.jabber.org/streams";

/**
* List of domains, subdomains and virtual hostnames of the remote server that were
* validated with this server. The remote server is allowed to send packets to this
Expand All @@ -89,6 +93,138 @@ public class LocalIncomingServerSession extends LocalServerSession implements In
*/
private String fromDomain = null;

/**
* Creates a new session that will receive packets. The new session will be authenticated
* before being returned. If the authentication process fails then the answer will be
* {@code null}.
*
* @param serverName hostname of this server.
* @param xpp pull parser that is processing data sent by the remote server.
* @param connection the new established connection with the remote server.
* @throws org.xmlpull.v1.XmlPullParserException if an error occurs while parsing the XML.
*/
public static LocalIncomingServerSession createSession(@Nonnull final String serverName, @Nonnull final XmlPullParser xpp, @Nonnull final Connection connection) throws XmlPullParserException
{
if (!xpp.getName().equals("stream")) {
throw new XmlPullParserException(
LocaleUtils.getLocalizedString("admin.error.bad-stream"));
}

if (!xpp.getNamespace(xpp.getPrefix()).equals(ETHERX_NAMESPACE))
{
throw new XmlPullParserException(LocaleUtils.getLocalizedString(
"admin.error.bad-namespace"));
}

// FIXME: add a 'is allowed' check to see if the remote server isn't blocked.

String version = xpp.getAttributeValue("", "version");
String fromDomain = xpp.getAttributeValue("", "from");
String toDomain = xpp.getAttributeValue("", "to");
int[] serverVersion = version != null ? decodeVersion(version) : new int[] {0,0};

if (toDomain == null) {
toDomain = serverName;
}

// Retrieve list of namespaces declared in current element.
connection.setAdditionalNamespaces(XMPPPacketReader.getPrefixedNamespacesOnCurrentElement(xpp));

try {
// Get the stream ID for the new session
StreamID streamID = SessionManager.getInstance().nextStreamID();
Log.debug("Create a server Session for the remote domain: {}", fromDomain);
LocalIncomingServerSession session =
SessionManager.getInstance().createIncomingServerSession(connection, streamID, fromDomain);

// Send the stream header
StringBuilder openingStream = new StringBuilder();
openingStream.append("<stream:stream");
openingStream.append(" xmlns:db=\"jabber:server:dialback\"");
openingStream.append(" xmlns:stream=\"http://etherx.jabber.org/streams\"");
openingStream.append(" xmlns=\"jabber:server\"");
openingStream.append(" from=\"").append(toDomain).append("\"");
if (fromDomain != null) {
openingStream.append(" to=\"").append(fromDomain).append("\"");
}
openingStream.append(" id=\"").append(streamID).append("\"");

// OF-443: Not responding with a 1.0 version in the stream header when federating with older
// implementations appears to reduce connection issues with those domains (patch by Marcin Cieślak).
if (serverVersion[0] >= 1) {
openingStream.append(" version=\"1.0\">");
} else {
openingStream.append('>');
}

Log.trace("Sending open stream response: {}", openingStream);
connection.deliverRawText(openingStream.toString());

if (serverVersion[0] >= 1) {
// Indicate the TLS policy to use for this connection
Connection.TLSPolicy tlsPolicy = connection.getTlsPolicy();
boolean hasCertificates = false;
try {
hasCertificates = XMPPServer.getInstance().getCertificateStoreManager().getIdentityStore( ConnectionType.SOCKET_S2S ).getStore().size() > 0;
}
catch (Exception e) {
Log.error(e.getMessage(), e);
}
if (Connection.TLSPolicy.required == tlsPolicy && !hasCertificates) {
Log.error("Server session rejected. TLS is required but no certificates " +
"were created.");
return null;
}
connection.setTlsPolicy(hasCertificates ? tlsPolicy : Connection.TLSPolicy.disabled);
}

// Indicate the compression policy to use for this connection
connection.setCompressionPolicy( connection.getConfiguration().getCompressionPolicy() );

if (serverVersion[0] >= 1) {
Log.debug("Remote server is XMPP 1.0 compliant so offer TLS and SASL to establish the connection (and server dialback)");
StringBuilder sb = new StringBuilder();
sb.append("<stream:features>");

if (connection.getConfiguration().getTlsPolicy() != Connection.TLSPolicy.legacyMode) {
sb.append("<starttls xmlns=\"urn:ietf:params:xml:ns:xmpp-tls\">");
if (!ServerDialback.isEnabled()) {
// Server dialback is disabled so TLS is required
sb.append("<required/>");
}
sb.append("</starttls>");
}

// Include available SASL Mechanisms
sb.append(SASLAuthentication.getSASLMechanisms(session));

if (ServerDialback.isEnabled()) {
// Also offer server dialback (when TLS is not required). Server dialback may be offered
// after TLS has been negotiated and a self-signed certificate is being used
sb.append("<dialback xmlns=\"urn:xmpp:features:dialback\"><errors/></dialback>");
}

sb.append("</stream:features>");

Log.trace("Sending stream features: {}", openingStream);
connection.deliverRawText(sb.toString());

} else {
// Sending features to Openfire < 3.7.1 confuses it too - OF-443)
Log.debug("Don't offer stream-features to pre-1.0 servers, as it confuses them. Reported version: {}", version);
}

// Set the domain or subdomain of the local server targeted by the remote server
session.setLocalDomain(serverName);
return session;
}
catch (Exception e) {
Log.error("Error establishing connection from remote server: {}", connection, e);
connection.close(new StreamError(StreamError.Condition.internal_server_error));
return null;
}
}

/**
* Creates a new session that will receive packets. The new session will be authenticated
* before being returned. If the authentication process fails then the answer will be
Expand All @@ -103,7 +239,9 @@ public class LocalIncomingServerSession extends LocalServerSession implements In
* a Server Dialback authentication process.
* @throws org.xmlpull.v1.XmlPullParserException if an error occurs while parsing the XML.
* @throws java.io.IOException if an input/output error occurs while using the connection.
* @deprecated Old, pre NIO / MINA code. Should not be used as NIO offers better performance.
*/
@Deprecated
public static LocalIncomingServerSession createSession(String serverName, XMPPPacketReader reader,
SocketConnection connection, boolean directTLS) throws XmlPullParserException, IOException {
XmlPullParser xpp = reader.getXPPParser();
Expand Down
Loading