Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handled client's receive maximum property to avoid publish flooding #858

Merged
merged 6 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[feature] Flow-control: Handled client's receive maximum property to configure the inflight window through the client. (#858)
[feature] Generate correct MANIFEST.MF with bnd-maven-plugin. (#848)
[feature] Flow-control: implemented publish's quota management on the server side. (#852)
[fix] Incorrect reference used in compareAndSet in CTrie.cleanTomb. (#841)
Expand Down
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public final class BrokerConstants {
public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
public static final int INFLIGHT_WINDOW_SIZE = 10;
public static final int INFINITE_SESSION_EXPIRY = 0xFFFFFFFF;
public static final int RECEIVE_MAXIMUM = 65 * 1024;

private BrokerConstants() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class BrokerConfiguration {
}
}

receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, Integer.MAX_VALUE);
receiveMaximum = props.intProp(IConfig.RECEIVE_MAXIMUM, BrokerConstants.RECEIVE_MAXIMUM);
}

// test method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void run() {
// Writer is idle - set a new timeout and notify the callback.
resenderTimeout = ctx.executor().schedule(this, resenderTimeNanos, TimeUnit.NANOSECONDS);
try {
resendNotAcked(ctx/* , event */);
resendNotAcked(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
Expand Down
21 changes: 14 additions & 7 deletions broker/src/main/java/io/moquette/broker/LimitedQuota.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package io.moquette.broker;

import io.moquette.BrokerConstants;

class LimitedQuota implements Quota {
private int receiveMaximum;
private final int receiveMaximum;
private int receivedQuota;

public LimitedQuota(int receiveMaximum) {
Expand All @@ -29,33 +31,38 @@ public LimitedQuota(int receiveMaximum) {

@Override
public boolean hasLimit() {
return true;
return receiveMaximum != BrokerConstants.RECEIVE_MAXIMUM;
}

@Override
public void decrement() {
public void consumeSlot() {
assert receivedQuota > 0;
receivedQuota--;
}

@Override
public void increment() {
public void releaseSlot() {
receivedQuota++;
assert receivedQuota <= receiveMaximum;
}

@Override
public boolean isConsumed() {
return receivedQuota == 0;
public boolean hasFreeSlots() {
return receivedQuota != 0;
}

@Override
public int getMaximum() {
return receiveMaximum;
}

@Override
public int availableSlots() {
return receivedQuota;
}

@Override
public String toString() {
return "limited quota to " + receivedQuota;
return "limited quota to " + receivedQuota + " max slots: " + receiveMaximum;
}
}
56 changes: 40 additions & 16 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLPeerUnverifiedException;

import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
Expand All @@ -73,6 +72,7 @@ final class MQTTConnection {
private Session bindedSession;
private int protocolVersion;
private Quota receivedQuota;
private Quota sendQuota;

MQTTConnection(Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator,
SessionRegistry sessionRegistry, PostOffice postOffice) {
Expand Down Expand Up @@ -167,11 +167,11 @@ private void processPubAck(MqttMessage msg) {
* Factory method
* */
public static Quota createQuota(int receiveMaximum) {
if (receiveMaximum == Integer.MAX_VALUE) {
return new UnlimitedQuota();
} else {
return new LimitedQuota(receiveMaximum);
}
return new LimitedQuota(receiveMaximum);
}

Quota sendQuota() {
return sendQuota;
}

PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
Expand All @@ -190,7 +190,7 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
}
final boolean cleanSession = msg.variableHeader().isCleanSession();
final boolean serverGeneratedClientId;
if (clientId == null || clientId.length() == 0) {
if (clientId == null || clientId.isEmpty()) {
if (isNotProtocolVersion(msg, MqttVersion.MQTT_5)) {
if (!brokerConfig.isAllowZeroByteClientId()) {
LOG.info("Broker doesn't permit MQTT empty client ID. Username: {}", username);
Expand Down Expand Up @@ -227,6 +227,8 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
}
receivedQuota = createQuota(brokerConfig.receiveMaximum());

sendQuota = retrieveSendQuota(msg);

final String sessionId = clientId;
protocolVersion = msg.variableHeader().version();
return postOffice.routeCommand(clientId, "CONN", () -> {
Expand All @@ -236,6 +238,28 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
});
}

private Quota retrieveSendQuota(MqttConnectMessage msg) {
if (isProtocolVersion(msg, MqttVersion.MQTT_3_1) || isProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
// for protocol versions that didn't define explicit
// the receiver maximum and without specification of flow control
// define one by the default.
return createQuota(BrokerConstants.INFLIGHT_WINDOW_SIZE);
}

MqttProperties.IntegerProperty receiveMaximumProperty = (MqttProperties.IntegerProperty) msg.variableHeader()
.properties()
.getProperty(MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM.value());
if (receiveMaximumProperty == null) {
return createQuota(BrokerConstants.RECEIVE_MAXIMUM);
}
return createQuota(receiveMaximumProperty.value());
}

// only for test
protected void assignSendQuota(Quota quota) {
this.sendQuota = quota;
}

private void checkMatchSessionLoop(String clientId) {
if (!sessionLoopDebug) {
return;
Expand Down Expand Up @@ -342,6 +366,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
LOG.trace("dispatch connection: {}", msg);
}
} else {
// here the session should still be in CONNECTING state
sessionRegistry.connectionClosed(bindedSession);
LOG.error("CONNACK send failed, cleanup session and close the connection", future.cause());
channel.close();
Expand Down Expand Up @@ -395,7 +420,6 @@ private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverG

builder
.sessionExpiryInterval(BrokerConstants.INFINITE_SESSION_EXPIRY)
.receiveMaximum(INFLIGHT_WINDOW_SIZE)
.retainAvailable(true)
.wildcardSubscriptionAvailable(true)
.subscriptionIdentifiersAvailable(true)
Expand Down Expand Up @@ -670,8 +694,8 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
return null;
}).ifFailed(msg::release);
case AT_LEAST_ONCE:
if (receivedQuota.isConsumed()) {
LOG.info("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota);
if (!receivedQuota.hasFreeSlots()) {
LOG.warn("Client {} exceeded the quota {} processing QoS1, disconnecting it", clientId, receivedQuota);
brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED);
disconnectSession();
dropConnection();
Expand All @@ -682,16 +706,16 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
return null;
receivedQuota.decrement();
receivedQuota.consumeSlot();
postOffice.receivedPublishQos1(this, username, messageID, msg, expiry)
.completableFuture().thenRun(() -> {
receivedQuota.increment();
receivedQuota.releaseSlot();
});
return null;
}).ifFailed(msg::release);
case EXACTLY_ONCE: {
if (receivedQuota.isConsumed()) {
LOG.info("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota);
if (!receivedQuota.hasFreeSlots()) {
LOG.warn("Client {} exceeded the quota {} processing QoS2, disconnecting it", clientId, receivedQuota);
brokerDisconnect(MqttReasonCodes.Disconnect.RECEIVE_MAXIMUM_EXCEEDED);
disconnectSession();
dropConnection();
Expand All @@ -703,7 +727,7 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
if (!isBoundToSession())
return null;
bindedSession.receivedPublishQos2(messageID, msg);
receivedQuota.decrement();
receivedQuota.consumeSlot();
return null;
});
if (!firstStepResult.isSuccess()) {
Expand Down Expand Up @@ -761,7 +785,7 @@ private void processPubRel(MqttMessage msg) {
checkMatchSessionLoop(clientID);
executePubRel(messageID);
// increment send quota after send PUBCOMP to the client
receivedQuota.increment();
receivedQuota.releaseSlot();
return null;
});
}
Expand Down
8 changes: 5 additions & 3 deletions broker/src/main/java/io/moquette/broker/Quota.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ interface Quota {

boolean hasLimit();

void decrement();
void consumeSlot();

void increment();
void releaseSlot();

boolean isConsumed();
boolean hasFreeSlots();

int getMaximum();

int availableSlots();
}
Loading
Loading