Skip to content

Commit

Permalink
Handled client's receive maximum property to avoid publish flooding (#…
Browse files Browse the repository at this point in the history
…858)

Handle client's receive maximum property to configure the inflight window through the client.

The main change contained in this PR is usage of a `sendQuota` in the `MQTTConnection` to keep the inflight zone size instead of `Session`'s `inflightSlots`. The method used to resend inflight messages has been updated to respect the send quota (aka `receive_maximum` property ) expressed by the client on CONNECT message.
If the inflight space is bigger than the actual send quota, maybe because the client squeezed it on a reconnection, it partition the inflight to fill up the send quota. The resend of inflight take precedence over queue drain both on processing of QoS1 PUBACK and QoS2 PURBEC.
If client connects through MQTT 3 the hardcoded limit (10) is used for inflight zone.
If it connects with MQTT5 it reads the `receive_maximum` property or set to default 65535 default value if not specified.
  • Loading branch information
andsel authored Oct 26, 2024
1 parent 4c0a0f0 commit 0d22c0d
Show file tree
Hide file tree
Showing 16 changed files with 309 additions and 164 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858)
[feature] Generate correct MANIFEST.MF with bnd-maven-plugin. (#848)
[feature] Flow-control: implemented publish's quota management on the server side. (#852)
[fix] Incorrect reference used in compareAndSet in CTrie.cleanTomb. (#841)
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public final class BrokerConstants {
public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
public static final int INFLIGHT_WINDOW_SIZE = 10;
public static final int INFINITE_SESSION_EXPIRY = 0xFFFFFFFF;
public static final int RECEIVE_MAXIMUM = 65 * 1024;

private BrokerConstants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class BrokerConfiguration {
}
}

receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, Integer.MAX_VALUE);
receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM);
}

// test method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void run() {
// Writer is idle - set a new timeout and notify the callback.
resenderTimeout = ctx.executor().schedule(this, resenderTimeNanos, TimeUnit.NANOSECONDS);
try {
resendNotAcked(ctx/* , event */);
resendNotAcked(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
Expand Down
21 changes: 14 additions & 7 deletions broker/src/main/java/io/moquette/broker/LimitedQuota.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package io.moquette.broker;

import io.moquette.BrokerConstants;

class LimitedQuota implements Quota {
private int receiveMaximum;
private final int receiveMaximum;
private int receivedQuota;

public LimitedQuota(int receiveMaximum) {
Expand All @@ -29,33 +31,38 @@ public LimitedQuota(int receiveMaximum) {

@Override
public boolean hasLimit() {
return true;
return receiveMaximum != BrokerConstants.RECEIVE_MAXIMUM;
}

@Override
public void decrement() {
public void consumeSlot() {
assert receivedQuota > 0;
receivedQuota--;
}

@Override
public void increment() {
public void releaseSlot() {
receivedQuota++;
assert receivedQuota <= receiveMaximum;
}

@Override
public boolean isConsumed() {
return receivedQuota == 0;
public boolean hasFreeSlots() {
return receivedQuota != 0;
}

@Override
public int getMaximum() {
return receiveMaximum;
}

@Override
public int availableSlots() {
return receivedQuota;
}

@Override
public String toString() {
return "limited quota to " + receivedQuota;
return "limited quota to " + receivedQuota + " max slots: " + receiveMaximum;
}
}
56 changes: 40 additions & 16 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLPeerUnverifiedException;

import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
Expand All @@ -73,6 +72,7 @@ final class MQTTConnection {
private Session bindedSession;
private int protocolVersion;
private Quota receivedQuota;
private Quota sendQuota;

MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator,
SessionRegistry sessionRegistry, PostOffice postOffice) {
Expand Down Expand Up @@ -167,11 +167,11 @@ private void processPubAck(MqttMessage msg) {
* Factory method
* */
public static Quota createQuota(int receiveMaximum) {
if (receiveMaximum == Integer.MAX_VALUE) {
return new UnlimitedQuota();
} else {
return new LimitedQuota(receiveMaximum);
}
return new LimitedQuota(receiveMaximum);
}

Quota sendQuota() {
return sendQuota;
}

PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
Expand All @@ -190,7 +190,7 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
}
final boolean cleanSession = msg.variableHeader().isCleanSession();
final boolean serverGeneratedClientId;
if (clientId == null || clientId.length() == 0) {
if (clientId == null || clientId.isEmpty()) {
if (isNotProtocolVersion(msg, MqttVersion.MQTT_5)) {
if (!brokerConfig.isAllowZeroByteClientId()) {
LOG.info("Broker doesn't permit MQTT empty client ID. Username: {}", username);
Expand Down Expand Up @@ -227,6 +227,8 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
}
receivedQuota = createQuota(brokerConfig.receiveMaximum());

sendQuota = retrieveSendQuota(msg);

final String sessionId = clientId;
protocolVersion = msg.variableHeader().version();
return postOffice.routeCommand(clientId, "CONN", () -> {
Expand All @@ -236,6 +238,28 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
});
}

private Quota retrieveSendQuota(MqttConnectMessage msg) {
if (isProtocolVersion(msg, MqttVersion.MQTT_3_1) || isProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
// for protocol versions that didn't define explicit
// the receiver maximum and without specification of flow control
// define one by the default.
return createQuota(BrokerConstants.INFLIGHT_WINDOW_SIZE);
}

MqttProperties.IntegerProperty receiveMaximumProperty = (MqttProperties.IntegerProperty) msg.variableHeader()
.properties()
.getProperty(MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value());
if (receiveMaximumProperty == null) {
return createQuota(BrokerConstants.RECEIVE_MAXIMUM);
}
return createQuota(receiveMaximumProperty.value());
}

// only for test
protected void assignSendQuota(Quota quota) {
this.sendQuota = quota;
}

private void checkMatchSessionLoop(String clientId) {
if (!sessionLoopDebug) {
return;
Expand Down Expand Up @@ -342,6 +366,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
LOG.trace("dispatch connection: {}", msg);
}
} else {
// here the session should still be in CONNECTING state
sessionRegistry.connectionClosed(bindedSession);
LOG.error("CONNACK send failed, cleanup session and close the connection", future.cause());
channel.close();
Expand Down Expand Up @@ -395,7 +420,6 @@ private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverG

builder
.sessionExpiryInterval(BrokerConstants.INFINITE_SESSION_EXPIRY)
.receiveMaximum(INFLIGHT_WINDOW_SIZE)
.retainAvailable(true)
.wildcardSubscriptionAvailable(true)
.subscriptionIdentifiersAvailable(true)
Expand Down Expand Up @@ -670,8 +694,8 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
return null;
}).ifFailed(msg::release);
case AT_LEAST_ONCE:
if (receivedQuota.isConsumed()) {
LOG.info("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota);
if (!receivedQuota.hasFreeSlots()) {
LOG.warn("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota);
brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED);
disconnectSession();
dropConnection();
Expand All @@ -682,16 +706,16 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
receivedQuota.decrement();
receivedQuota.consumeSlot();
postOffice.receivedPublishQos1(this, username, messageID, msg, expiry)
.completableFuture().thenRun(() -> {
receivedQuota.increment();
receivedQuota.releaseSlot();
});
return null;
}).ifFailed(msg::release);
case EXACTLY_ONCE: {
if (receivedQuota.isConsumed()) {
LOG.info("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota);
if (!receivedQuota.hasFreeSlots()) {
LOG.warn("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota);
brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED);
disconnectSession();
dropConnection();
Expand All @@ -703,7 +727,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
if (!isBoundToSession())
return null;
bindedSession.receivedPublishQos2(messageID, msg);
receivedQuota.decrement();
receivedQuota.consumeSlot();
return null;
});
if (!firstStepResult.isSuccess()) {
Expand Down Expand Up @@ -761,7 +785,7 @@ private void processPubRel(MqttMessage msg) {
checkMatchSessionLoop(clientID);
executePubRel(messageID);
// increment send quota after send PUBCOMP to the client
receivedQuota.increment();
receivedQuota.releaseSlot();
return null;
});
}
Expand Down
8 changes: 5 additions & 3 deletions broker/src/main/java/io/moquette/broker/Quota.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ interface Quota {

boolean hasLimit();

void decrement();
void consumeSlot();

void increment();
void releaseSlot();

boolean isConsumed();
boolean hasFreeSlots();

int getMaximum();

int availableSlots();
}
Loading

0 comments on commit 0d22c0d

Please sign in to comment.