diff --git a/lib/include/Network/Clients/MQTT.hpp b/lib/include/Network/Clients/MQTT.hpp index 416e466..06e602c 100644 --- a/lib/include/Network/Clients/MQTT.hpp +++ b/lib/include/Network/Clients/MQTT.hpp @@ -40,13 +40,18 @@ namespace Network virtual uint32 maxPacketSize() const { return 2048U; } #if MQTTUseAuth == 1 - /** An authentication packet was received. - @param reasonCode Any of Success, ContinueAuthentication, ReAuthenticate + /** An authentication packet was received. + This is called either during connection and in the event loop in case the server started it + @param reasonCode Any of Success, ContinueAuthentication, ReAuthenticate or NotAuthorized, BadAuthenticationMethod @param authMethod The authentication method @param authData The authentication data @param properties If any attached to the packet, you'll find the list here. + @return true If authentication was a success or false otherwise @warning By default, no action is done upon authentication packets. It's up to you to implement those packets */ - virtual void authReceived(const ReasonCodes reasonCode, const DynamicStringView & authMethod, const DynamicBinDataView & authData, const PropertiesView & properties) { } + virtual bool authReceived(const ReasonCodes reasonCode, const DynamicStringView & authMethod, const DynamicBinDataView & authData, const PropertiesView & properties) + { + return false; + } #endif virtual ~MessageReceived() {} }; diff --git a/lib/include/Network/Clients/MQTTConfig.hpp b/lib/include/Network/Clients/MQTTConfig.hpp index 405d6b5..0ed65e8 100644 --- a/lib/include/Network/Clients/MQTTConfig.hpp +++ b/lib/include/Network/Clients/MQTTConfig.hpp @@ -12,21 +12,21 @@ #define MQTTUseAuth 0 /** Unsubscribe support. Set to 1 if you intend to unsubscribe dynamically and partially from the broker. - Typically unused for the majority of embedded case where the client is subscribing all topics at once and let + Typically unused for the majority of embedded case where the client is subscribing all topics at once and let the broker unsubscribe by itself upon disconnection, this saves binary size if left disabled Default: 0 */ #define MQTTUseUnsubscribe 0 -/** Dump all MQTT communication. - This causes a large increase in binary size, induce an important latency cost, and lower the security by - displaying potentially private informations - Default: 0 */ +/** Dump all MQTT communication. + This causes a large increase in binary size, induce an important latency cost, and lower the security by + displaying potentially private informations + Default: 0 */ #define MQTTDumpCommunication 0 /** Remove all validation from MQTT types. - This removes validation check for all MQTT types in order to save binary size. - This is only recommanded if you are sure about your broker implementation (don't set this to 1 if you + This removes validation check for all MQTT types in order to save binary size. + This is only recommanded if you are sure about your broker implementation (don't set this to 1 if you intend to connect to unknown broker) Default: 0 */ #define MQTTAvoidValidation 1 @@ -36,7 +36,7 @@ This adds a large impact to the binary size since the socket code is then duplicated (SSL and non SSL). The SSL socket code provided is using mbedtls, but one could use BearSSL if size is really limited instead. - Please notice that this has no effect if MQTTOnlyBSDSocket is 0, since ClassPath embeds its own SSL socket + Please notice that this has no effect if MQTTOnlyBSDSocket is 0, since ClassPath embeds its own SSL socket code (abstracted away at a higher level) Default: 1 */ #ifndef MQTTUseTLS @@ -44,21 +44,21 @@ #endif /** Simple socket code. - If set to true, this disables the optimized network code from ClassPath and fallback to the minimal subset + If set to true, this disables the optimized network code from ClassPath and fallback to the minimal subset of BSD socket API (typically send / recv / connect / select / close / setsockopt). This also limits binary code size but prevent using SSL/TLS (unless you write a wrapper for it). This is usually enabled for embedded system with very limited resources. - - Please notice that this also change the meaning of timeout values. If it's not set, then timeouts represent - the maximum time that a method could spend (including all sub-functions calls). It's deterministic. - When it's set, then timeouts represent the maximum inactivity time before any method times out. So if you - have a very slow connection sending 1 byte per the timeout delay, in the former case, it'll timeout after - the first byte is received, while in the latter case, it might never timeout and take up to + + Please notice that this also change the meaning of timeout values. If it's not set, then timeouts represent + the maximum time that a method could spend (including all sub-functions calls). It's deterministic. + When it's set, then timeouts represent the maximum inactivity time before any method times out. So if you + have a very slow connection sending 1 byte per the timeout delay, in the former case, it'll timeout after + the first byte is received, while in the latter case, it might never timeout and take up to `timeout * packetLength` time to return. - + Default: 0 */ -#ifndef MQTTOnlyBSDSocket +#ifndef MQTTOnlyBSDSocket #define MQTTOnlyBSDSocket 1 #endif @@ -76,31 +76,31 @@ #define CONF_UNSUB "_" #endif - + #if MQTTDumpCommunication == 1 #define CONF_DUMP "Dump_" #else #define CONF_DUMP "_" #endif - + #if MQTTAvoidValidation == 1 #define CONF_VALID "Check_" #else #define CONF_VALID "_" #endif - + #if MQTTUseTLS == 1 #define CONF_TLS "TLS_" #else #define CONF_TLS "_" #endif - + #if MQTTOnlyBSDSocket == 1 #define CONF_SOCKET "BSD" #else #define CONF_SOCKET "CP" #endif - + #pragma message("Building eMQTT5 with flags: " CONF_AUTH CONF_UNSUB CONF_DUMP CONF_VALID CONF_TLS CONF_SOCKET) #endif diff --git a/lib/src/Network/Clients/MQTTClient.cpp b/lib/src/Network/Clients/MQTTClient.cpp index 8f9139f..409fb80 100644 --- a/lib/src/Network/Clients/MQTTClient.cpp +++ b/lib/src/Network/Clients/MQTTClient.cpp @@ -1,9 +1,18 @@ // We need our implementation #include +#if MQTTUseAuth == 1 + /** Used to track reentrancy in the AUTH recursive scheme */ + enum AuthReentrancy + { + FromConnect = 0x80000000, + AuthMask = 0x7FFFFFFF, + }; +#endif + #if MQTTOnlyBSDSocket != 1 -#pragma message("This configuration is not supported and depends on external code in tests folder that is not exported upon install") +#pragma message("This configuration is not supported and depends on external code in tests folder that is not exported upon install") // We need socket declaration #include "Network/Socket.hpp" // We need SSL socket declaration too @@ -75,7 +84,7 @@ namespace Network { namespace Client { SSLContext * sslContext; #endif /** This client unique identifier */ - DynamicString clientID; + DynamicString clientID; /** The message received callback to use */ MessageReceived * cb; /** The default timeout in milliseconds */ @@ -84,16 +93,16 @@ namespace Network { namespace Client { /** The last communication time in second */ uint32 lastCommunication; /** The publish current default identifier allocator */ - uint16 publishCurrentId; + uint16 publishCurrentId; /** The keep alive delay in seconds */ - uint16 keepAlive; + uint16 keepAlive; - /** The reading state. Because data on a TCP stream is + /** The reading state. Because data on a TCP stream is a stream, we have to remember what state we are currently following while parsing data */ enum RecvState { - Ready = 0, + Ready = 0, GotType, GotLength, GotCompletePacket, @@ -109,22 +118,31 @@ namespace Network { namespace Client { /** The receiving VBInt size for the packet header */ uint8 packetExpectedVBSize; + #if MQTTUseAuth == 1 + /** Used to track the origin of the AUTH exchange */ + uint32 authSource; + #endif + + uint16 allocatePacketID() { return ++publishCurrentId; } Impl(const char * clientID, MessageReceived * callback, const DynamicBinDataView * brokerCert) - : socket(0), brokerCert(brokerCert), + : socket(0), brokerCert(brokerCert), #if MQTTUseTLS == 1 sslContext(0), - #endif + #endif + #if MQTTUseAuth == 1 + authSource(0), + #endif clientID(clientID), cb(callback), timeoutMs(3000), lastCommunication(0), publishCurrentId(0), keepAlive(300), recvState(Ready), recvBufferSize(max(callback->maxPacketSize(), 8U)), maxPacketSize(65535), available(0), recvBuffer((uint8*)::malloc(recvBufferSize)), packetExpectedVBSize(Protocol::MQTT::Common::VBInt(recvBufferSize).getSize()) {} ~Impl() { delete socket; socket = 0; ::free(recvBuffer); recvBuffer = 0; recvBufferSize = 0; } - + inline void setTimeout(uint32 timeout) { timeoutMs = timeout; } bool shouldPing() @@ -150,9 +168,9 @@ namespace Network { namespace Client { { Protocol::MQTT::Common::VBInt l; return l.readFrom(recvBuffer + 1, available - 1) != Protocol::MQTT::Common::BadData; - } + } - /** Receive a control packet from the socket in the given time. + /** Receive a control packet from the socket in the given time. @retval positive The number of bytes received @retval 0 Protocol error, you should close the socket @retval -1 Socket error @@ -181,7 +199,7 @@ namespace Network { namespace Client { case Ready: case GotType: { // Here, make sure we only fetch the length first - // The minimal size is 2 bytes for PINGRESP, DISCONNECT and AUTH. + // The minimal size is 2 bytes for PINGRESP, DISCONNECT and AUTH. // Because of this, we can't really outsmart the system everytime ret = socket->receiveReliably((char*)&recvBuffer[available], 2 - available, timeout); if (ret > 0) available += ret; @@ -213,9 +231,9 @@ namespace Network { namespace Client { if (r == Protocol::MQTT::Common::NotEnoughData) { if (available >= (packetExpectedVBSize+1)) - { // The server sends us a packet that's larger than the expected maximum size, + { // The server sends us a packet that's larger than the expected maximum size, // In MQTTv5 it's a protocol error, so let's disconnect - return 0; + return 0; } // We haven't received enough data in the given timeout to make progress, let's report a timeout recvState = GotType; @@ -274,7 +292,7 @@ namespace Network { namespace Client { // Seems to be the expected type, let's unserialize it uint32 r = packet.readFrom(recvBuffer, recvBufferSize); if (Protocol::MQTT::Common::isError(r)) return -4; // Parsing error - + // Done with receiving the packet let's remember it resetPacketReceivingState(); @@ -305,7 +323,7 @@ namespace Network { namespace Client { if (withTLS) { #if MQTTUseTLS == 1 - if (!sslContext) + if (!sslContext) { // If one certificate is given let's use it instead of the default CA bundle sslContext = brokerCert ? new SSLContext(NULL, Crypto::SSLContext::Any) : new SSLContext(); } @@ -324,7 +342,7 @@ namespace Network { namespace Client { return -1; #endif } else socket = new Socket(Network::Socket::BaseSocket::Stream); - + if (!socket) return -2; // Let the socket be asynchronous and without Nagle's algorithm if (!socket->setOption(Network::Socket::BaseSocket::Blocking, 0)) return -3; @@ -339,10 +357,117 @@ namespace Network { namespace Client { if (socket->select(false, true, timeoutMs)) return 0; return -7; } + + MQTTv5::ErrorType handleAuth() + { + Protocol::MQTT::V5::ROAuthPacket packet; + int ret = extractControlPacket(type, packet); + if (ret > 0) + { + // Parse the Auth packet and call the user method + // Try to find the auth method, and the auth data + DynamicStringView authMethod; + DynamicBinDataView authData; + Protocol::MQTT::V5::VisitorVariant visitor; + while (packet.props.getProperty(visitor) && (authMethod.length == 0 || authData.length == 0)) + { + if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationMethod) + { + auto view = visitor.as< DynamicStringView >(); + authMethod = *view; + } + else if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationData) + { + auto data = visitor.as< DynamicBinDataView >(); + authData = *data; + } + } + return cb->authReceived(packet.fixedVariableHeader.reason(), authMethod, authData, packet.props) ? MQTTv5::ErrorType::Success : MQTTv5::ErrorType::NetworkError; + } + return ErrorType::NetworkError; + } + MQTTv5::ErrorType handleConnACK() + { + // Parse the ConnACK packet; + Protocol::MQTT::V5::ROConnACKPacket packet; + int ret = extractControlPacket(type, packet); + if (ret > 0) + { + // We are only interested in the result of the connection + if (packet.fixedVariableHeader.acknowledgeFlag & 1) + { // Session is present on the server. For now, we don't care, do we ? + + } + if (packet.fixedVariableHeader.reasonCode != 0 +#if MQTTUseAuth == 1 + && packet.fixedVariableHeader.reasonCode != Protocol::MQTT::V5::NotAuthorized + && packet.fixedVariableHeader.reasonCode != Protocol::MQTT::V5::BadAuthenticationMethod +#endif + ) + { + // We have failed connection with the following reason: + return (MQTTv5::ReasonCodes)packet.fixedVariableHeader.reasonCode; + } + // Now, we are going to parse the other properties +#if MQTTUseAuth == 1 + DynamicStringView authMethod; + DynamicBinDataView authData; +#endif + Protocol::MQTT::V5::VisitorVariant visitor; + while (packet.props.getProperty(visitor)) + { + switch (visitor.propertyType()) + { + case Protocol::MQTT::V5::PacketSizeMax: + { + auto pod = visitor.as< Protocol::MQTT::V5::LittleEndianPODVisitor >(); + maxPacketSize = pod->getValue(); + break; + } + case Protocol::MQTT::V5::AssignedClientID: + { + auto view = visitor.as< Protocol::MQTT::V5::DynamicStringView >(); + clientID.from(view->data, view->length); // This allocates memory for holding the copy + break; + } + case Protocol::MQTT::V5::ServerKeepAlive: + { + auto pod = visitor.as< Protocol::MQTT::V5::LittleEndianPODVisitor >(); + keepAlive = (pod->getValue() + (pod->getValue()>>1)) >> 1; // Use 0.75 of the server's told value + break; + } +#if MQTTUseAuth == 1 + case Protocol::MQTT::V5::AuthenticationMethod: + { + auto view = visitor.as(); + authMethod = *view; + } break; + case Protocol::MQTT::V5::AuthenticationData: + { + auto data = visitor.as(); + authData = *data; + } break; +#endif + // Actually, we don't care about other properties. Maybe we should ? + default: break; + } + } +#if MQTTUseAuth == 1 + if (packet.fixedVariableHeader.reasonCode == Protocol::MQTT::V5::NotAuthorized + || packet.fixedVariableHeader.reasonCode == Protocol::MQTT::V5::BadAuthenticationMethod) + { // Let the user be aware of the required authentication properties so next connect will/can contains them + return cb->authReceived((ReasonCodes)packet.fixedVariableHeader.reasonCode, authMethod, authData, packet.props) ? ErrorType::Success : ErrorType::NetworkError; + } +#endif + return ErrorType::Success; + } + return Protocol::MQTT::V5::ProtocolError; + } + }; #else #ifndef MQTTLock - /* If you have a true lock object in your system (for example, in FreeRTOS, use a mutex), + /* If you have a true lock object in your system (for example, in FreeRTOS, use a mutex), you should provide one instead of this one as this one just burns CPU while waiting */ class SpinLock { @@ -353,12 +478,12 @@ namespace Network { namespace Client { /** Acquire the lock */ inline void acquire() volatile { - while (state.exchange(true, std::memory_order_acq_rel)) + while (state.exchange(true, std::memory_order_acq_rel)) { // Put a sleep method here (using select here since it's cross platform in BSD socket API) struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 500; // Wait 0.5ms per loop - select(0, NULL, NULL, NULL, &tv); + select(0, NULL, NULL, NULL, &tv); } } /** Try to acquire the lock */ @@ -406,11 +531,11 @@ namespace Network { namespace Client { #endif #if MQTTUseTLS == 1 - // Small optimization to remove useless virtual table in the final binary if not used + // Small optimization to remove useless virtual table in the final binary if not used #define MQTTVirtual virtual #else #define MQTTVirtual -#endif +#endif struct BaseSocket { @@ -424,7 +549,7 @@ namespace Network { namespace Client { // Please notice that under linux, it's not required to set the socket // as non blocking if you define SO_SNDTIMEO, for connect timeout. - // so the code below could be optimized away. Yet, lwIP does show the + // so the code below could be optimized away. Yet, lwIP does show the // same behavior so when a timeout for connection is actually required // you must issue a select call here. @@ -459,7 +584,7 @@ namespace Network { namespace Client { if (ret == 0) return 0; // Here, we need to wait until connection happens or times out - if (select(false, true)) + if (select(false, true)) { // Restore blocking behavior here if (::fcntl(socket, F_SETFL, socketFlags) != 0) return -3; @@ -521,7 +646,7 @@ namespace Network { namespace Client { mbedtls_ssl_config conf; mbedtls_x509_crt cacert; mbedtls_net_context net; - + private: bool buildConf(const MQTTv5::DynamicBinDataView * brokerCert) { @@ -539,11 +664,11 @@ namespace Network { namespace Client { ::mbedtls_ssl_conf_authmode(&conf, brokerCert ? MBEDTLS_SSL_VERIFY_REQUIRED : MBEDTLS_SSL_VERIFY_NONE); uint32_t ms = timeoutMs.tv_usec / 1000; - ::mbedtls_ssl_conf_read_timeout(&conf, ms < 50 ? 3000 : ms); + ::mbedtls_ssl_conf_read_timeout(&conf, ms < 50 ? 3000 : ms); // Random number generator ::mbedtls_ssl_conf_rng(&conf, ::mbedtls_ctr_drbg_random, &entropySource); - if (::mbedtls_ctr_drbg_seed(&entropySource, ::mbedtls_entropy_func, &entropy, NULL, 0)) + if (::mbedtls_ctr_drbg_seed(&entropySource, ::mbedtls_entropy_func, &entropy, NULL, 0)) return false; if (::mbedtls_ssl_setup(&ssl, &conf)) @@ -581,15 +706,15 @@ namespace Network { namespace Client { ::mbedtls_ssl_set_bio(&ssl, &net, ::mbedtls_net_send, NULL, ::mbedtls_net_recv_timeout); ret = ::mbedtls_ssl_handshake(&ssl); - if (ret != 0 && ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE) + if (ret != 0 && ret != MBEDTLS_ERR_SSL_WANT_READ && ret != MBEDTLS_ERR_SSL_WANT_WRITE) return -10; - + // Check certificate if one provided - if (brokerCert) + if (brokerCert) { uint32_t flags = mbedtls_ssl_get_verify_result(&ssl); if (flags != 0) - { + { #if MQTTDumpCommunication == 1 char verify_buf[100] = {0}; mbedtls_x509_crt_verify_info(verify_buf, sizeof(verify_buf), " ! ", flags); @@ -615,7 +740,7 @@ namespace Network { namespace Client { while (ret < minLength) { int r = ::mbedtls_ssl_read(&ssl, (uint8*)&buffer[ret], minLength - ret); - if (r <= 0) + if (r <= 0) { // Those means that we need to call again the read method if (r == MBEDTLS_ERR_SSL_WANT_READ || r == MBEDTLS_ERR_SSL_WANT_WRITE) @@ -657,7 +782,7 @@ namespace Network { namespace Client { /** The DER encoded certificate (if provided) */ const DynamicBinDataView * brokerCert; /** This client unique identifier */ - DynamicString clientID; + DynamicString clientID; /** The message received callback to use */ MessageReceived * cb; /** The default timeout in milliseconds */ @@ -666,21 +791,25 @@ namespace Network { namespace Client { /** The last communication time in second */ uint32 lastCommunication; /** The publish current default identifier allocator */ - uint16 publishCurrentId; + uint16 publishCurrentId; /** The keep alive delay in seconds */ uint16 keepAlive; -#if MQTTUseUnsubscribe == 1 +#if MQTTUseAuth == 1 + /** Mask used to track the origin of the AUTH exchange and reentrancy issues */ + uint32 authSource; +#endif +#if MQTTUseUnsubscribe == 1 /** The last unsubscribe id */ uint16 unsubscribeId; /** The last unsubscribe error code */ - MQTTv5::ErrorType::Type lastUnsubscribeError; + MQTTv5::ErrorType::Type lastUnsubscribeError; #endif - /** The reading state. Because data on a TCP stream is + /** The reading state. Because data on a TCP stream is a stream, we have to remember what state we are currently following while parsing data */ enum RecvState { - Ready = 0, + Ready = 0, GotType, GotLength, GotCompletePacket, @@ -703,7 +832,10 @@ namespace Network { namespace Client { Impl(const char * clientID, MessageReceived * callback, const DynamicBinDataView * brokerCert) : socket(0), brokerCert(brokerCert), clientID(clientID), cb(callback), timeoutMs({3, 0}), lastCommunication(0), publishCurrentId(0), keepAlive(300), -#if MQTTUseUnsubscribe == 1 +#if MQTTUseAuth == 1 + authSource(0), +#endif +#if MQTTUseUnsubscribe == 1 unsubscribeId(0), lastUnsubscribeError(ErrorType::WaitingForResult), #endif recvState(Ready), recvBufferSize(max(callback->maxPacketSize(), 8U)), maxPacketSize(65535), available(0), recvBuffer((uint8*)::malloc(recvBufferSize)), packetExpectedVBSize(Protocol::MQTT::Common::VBInt(recvBufferSize).getSize()) @@ -725,9 +857,9 @@ namespace Network { namespace Client { { Protocol::MQTT::Common::VBInt l; return l.readFrom(recvBuffer + 1, available - 1) != Protocol::MQTT::Common::BadData; - } + } - /** Receive a control packet from the socket in the given time. + /** Receive a control packet from the socket in the given time. @retval positive The number of bytes received @retval 0 Protocol error, you should close the socket @retval -1 Socket error @@ -755,7 +887,7 @@ namespace Network { namespace Client { case Ready: case GotType: { // Here, make sure we only fetch the length first - // The minimal size is 2 bytes for PINGRESP and shortcut DISCONNECT / AUTH. + // The minimal size is 2 bytes for PINGRESP and shortcut DISCONNECT / AUTH. // Because of this, we can't really outsmart the system everytime ret = socket->recv((char*)&recvBuffer[available], 2 - available); if (ret > 0) available += ret; @@ -785,9 +917,9 @@ namespace Network { namespace Client { if (r == Protocol::MQTT::Common::NotEnoughData) { if (available >= (packetExpectedVBSize+1)) - { // The server sends us a packet that's larger than the expected maximum size, + { // The server sends us a packet that's larger than the expected maximum size, // In MQTTv5 it's a protocol error, so let's disconnect - return 0; + return 0; } // We haven't received enough data in the given timeout to make progress, let's report a timeout recvState = GotType; @@ -840,7 +972,7 @@ namespace Network { namespace Client { // Seems to be the expected type, let's unserialize it uint32 r = packet.readFrom(recvBuffer, recvBufferSize); if (Protocol::MQTT::Common::isError(r)) return -4; // Parsing error - + // Done with receiving the packet let's remember it resetPacketReceivingState(); @@ -870,17 +1002,125 @@ namespace Network { namespace Client { int connectWith(const char * host, const uint16 port, const bool withTLS) { if (isOpen()) return -1; - socket = + socket = #if MQTTUseTLS == 1 withTLS ? new MBTLSSocket(timeoutMs) : #endif new BaseSocket(timeoutMs); return socket ? socket->connect(host, port, brokerCert) : -1; } + +#if MQTTUseAuth == 1 + MQTTv5::ErrorType handleAuth() + { + Protocol::MQTT::V5::ROAuthPacket packet; + int ret = extractControlPacket(Protocol::MQTT::V5::AUTH, packet); + if (ret > 0) + { + // Parse the Auth packet and call the user method + // Try to find the auth method, and the auth data + DynamicStringView authMethod; + DynamicBinDataView authData; + Protocol::MQTT::V5::VisitorVariant visitor; + while (packet.props.getProperty(visitor) && (authMethod.length == 0 || authData.length == 0)) + { + if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationMethod) + { + auto view = visitor.as< DynamicStringView >(); + authMethod = *view; + } + else if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationData) + { + auto data = visitor.as< DynamicBinDataView >(); + authData = *data; + } + } + return cb->authReceived(packet.fixedVariableHeader.reason(), authMethod, authData, packet.props) ? MQTTv5::ErrorType::Success : MQTTv5::ErrorType::NetworkError; + } + return ErrorType::NetworkError; + } +#endif + MQTTv5::ErrorType handleConnACK() + { + // Parse the ConnACK packet; + Protocol::MQTT::V5::ROConnACKPacket packet; + int ret = extractControlPacket(Protocol::MQTT::V5::CONNACK, packet); + if (ret > 0) + { + // We are only interested in the result of the connection + if (packet.fixedVariableHeader.acknowledgeFlag & 1) + { // Session is present on the server. For now, we don't care, do we ? + + } + if (packet.fixedVariableHeader.reasonCode != 0 +#if MQTTUseAuth == 1 + && packet.fixedVariableHeader.reasonCode != Protocol::MQTT::V5::NotAuthorized + && packet.fixedVariableHeader.reasonCode != Protocol::MQTT::V5::BadAuthenticationMethod +#endif + ) + { + // We have failed connection with the following reason: + return (MQTTv5::ReasonCodes)packet.fixedVariableHeader.reasonCode; + } + // Now, we are going to parse the other properties +#if MQTTUseAuth == 1 + DynamicStringView authMethod; + DynamicBinDataView authData; +#endif + Protocol::MQTT::V5::VisitorVariant visitor; + while (packet.props.getProperty(visitor)) + { + switch (visitor.propertyType()) + { + case Protocol::MQTT::V5::PacketSizeMax: + { + auto pod = visitor.as< Protocol::MQTT::V5::LittleEndianPODVisitor >(); + maxPacketSize = pod->getValue(); + break; + } + case Protocol::MQTT::V5::AssignedClientID: + { + auto view = visitor.as< Protocol::MQTT::V5::DynamicStringView >(); + clientID.from(view->data, view->length); // This allocates memory for holding the copy + break; + } + case Protocol::MQTT::V5::ServerKeepAlive: + { + auto pod = visitor.as< Protocol::MQTT::V5::LittleEndianPODVisitor >(); + keepAlive = (pod->getValue() + (pod->getValue()>>1)) >> 1; // Use 0.75 of the server's told value + break; + } +#if MQTTUseAuth == 1 + case Protocol::MQTT::V5::AuthenticationMethod: + { + auto view = visitor.as(); + authMethod = *view; + } break; + case Protocol::MQTT::V5::AuthenticationData: + { + auto data = visitor.as(); + authData = *data; + } break; +#endif + // Actually, we don't care about other properties. Maybe we should ? + default: break; + } + } +#if MQTTUseAuth == 1 + if (packet.fixedVariableHeader.reasonCode == Protocol::MQTT::V5::NotAuthorized + || packet.fixedVariableHeader.reasonCode == Protocol::MQTT::V5::BadAuthenticationMethod) + { // Let the user be aware of the required authentication properties so next connect will/can contains them + return cb->authReceived((ReasonCodes)packet.fixedVariableHeader.reasonCode, authMethod, authData, packet.props) ? ErrorType::Success : ErrorType::NetworkError; + } +#endif + return ErrorType::Success; + } + return Protocol::MQTT::V5::ProtocolError; + } }; #endif - MQTTv5::MQTTv5(const char * clientID, MessageReceived * callback, const DynamicBinDataView * brokerCert) : impl(new Impl(clientID, callback, brokerCert)) {} + MQTTv5::MQTTv5(const char * clientID, MessageReceived * callback, const DynamicBinDataView * brokerCert) : impl(new Impl(clientID, callback, brokerCert)) {} MQTTv5::~MQTTv5() { delete impl; impl = 0; } MQTTv5::ErrorType::Type MQTTv5::prepareSAR(Protocol::MQTT::V5::ControlPacketSerializable & packet, bool withAnswer) @@ -896,7 +1136,7 @@ namespace Network { namespace Client { // packet.dump(out, 2); // printf("Prepared:\n%s\n", (const char*)out); #endif - // Make sure we are on a clean receiving state + // Make sure we are on a clean receiving state impl->resetPacketReceivingState(); if (impl->send((const char*)buffer, packetSize) != packetSize) @@ -914,15 +1154,15 @@ namespace Network { namespace Client { return ErrorType::Success; } - // Connect to the given server URL. + // Connect to the given server URL. MQTTv5::ErrorType MQTTv5::connectTo(const char * serverHost, const uint16 port, bool useTLS, const uint16 keepAliveTimeInSec, - const bool cleanStart, const char * userName, const DynamicBinDataView * password, WillMessage * willMessage, const QoSDelivery willQoS, const bool willRetain, + const bool cleanStart, const char * userName, const DynamicBinDataView * password, WillMessage * willMessage, const QoSDelivery willQoS, const bool willRetain, Properties * properties) { if (serverHost == nullptr || !port) return ErrorType::BadParameter; - // Please do not move the line below as it must outlive the packet + // Please do not move the line below as it must outlive the packet Protocol::MQTT::V5::Property maxProp(Protocol::MQTT::V5::PacketSizeMax, impl->recvBufferSize); Protocol::MQTT::V5::ControlPacket packet; @@ -945,7 +1185,7 @@ namespace Network { namespace Client { if (int ret = impl->connectWith(serverHost, port, useTLS)) return ret == -7 ? ErrorType::TimedOut : ErrorType::NetworkError; - // Create the header object now + // Create the header object now impl->keepAlive = (keepAliveTimeInSec + (keepAliveTimeInSec / 2)) / 2; // Make it 75% of what's given so we always wake up before doom's clock packet.fixedVariableHeader.keepAlive = keepAliveTimeInSec; packet.fixedVariableHeader.cleanStart = cleanStart ? 1 : 0; @@ -969,109 +1209,52 @@ namespace Network { namespace Client { Protocol::MQTT::V5::ControlPacketType type = impl->getLastPacketType(); if (type == Protocol::MQTT::V5::CONNACK) { - // Parse the ConnACK packet; - Protocol::MQTT::V5::ROConnACKPacket packet; - int ret = impl->extractControlPacket(type, packet); - if (ret > 0) - { - // We are only interested in the result of the connection - if (packet.fixedVariableHeader.acknowledgeFlag & 1) - { // Session is present on the server. For now, we don't care, do we ? - - } - if (packet.fixedVariableHeader.reasonCode != 0 -#if MQTTUseAuth == 1 - && packet.fixedVariableHeader.reasonCode != Protocol::MQTT::V5::NotAuthorized - && packet.fixedVariableHeader.reasonCode != Protocol::MQTT::V5::BadAuthenticationMethod -#endif - ) - { - // We have failed connection with the following reason: - impl->close(); - return (MQTTv5::ReasonCodes)packet.fixedVariableHeader.reasonCode; - } - // Now, we are going to parse the other properties -#if MQTTUseAuth == 1 - DynamicStringView authMethod; - DynamicBinDataView authData; -#endif - Protocol::MQTT::V5::VisitorVariant visitor; - while (packet.props.getProperty(visitor)) - { - switch (visitor.propertyType()) - { - case Protocol::MQTT::V5::PacketSizeMax: - { - auto pod = visitor.as< Protocol::MQTT::V5::LittleEndianPODVisitor >(); - impl->maxPacketSize = pod->getValue(); - break; - } - case Protocol::MQTT::V5::AssignedClientID: - { - auto view = visitor.as< Protocol::MQTT::V5::DynamicStringView >(); - impl->clientID.from(view->data, view->length); // This allocates memory for holding the copy - break; - } - case Protocol::MQTT::V5::ServerKeepAlive: - { - auto pod = visitor.as< Protocol::MQTT::V5::LittleEndianPODVisitor >(); - impl->keepAlive = (pod->getValue() + (pod->getValue()>>1)) >> 1; // Use 0.75 of the server's told value - break; - } -#if MQTTUseAuth == 1 - case Protocol::MQTT::V5::AuthenticationMethod: - { - auto view = visitor.as(); - authMethod = *view; - } break; - case Protocol::MQTT::V5::AuthenticationData: - { - auto data = visitor.as(); - authData = *data; - } break; -#endif - // Actually, we don't care about other properties. Maybe we should ? - default: break; - } - } + ErrorType ret = impl->handleConnACK(); #if MQTTUseAuth == 1 - if (packet.fixedVariableHeader.reasonCode == Protocol::MQTT::V5::NotAuthorized - || packet.fixedVariableHeader.reasonCode == Protocol::MQTT::V5::BadAuthenticationMethod) - { // Let the user be aware of the required authentication properties so next connect will/can contains them - impl->cb->authReceived((ReasonCodes)packet.fixedVariableHeader.reasonCode, authMethod, authData, packet.props); - return (ReasonCodes)packet.fixedVariableHeader.reasonCode; - } -#endif - return ErrorType::Success; - } + // No special treatment anymore for AUTH packets + impl->authSource = 0; +#endif + if (ret != ErrorType::Success) + impl->close(); + return ret; } -#if MQTTUseAuth == 1 +#if MQTTUseAuth == 1 else if (type == Protocol::MQTT::V5::AUTH) { - Protocol::MQTT::V5::ROAuthPacket packet; - int ret = impl->extractControlPacket(type, packet); - if (ret > 0) - { - // Parse the Auth packet and call the user method - // Try to find the auth method, and the auth data - DynamicStringView authMethod; - DynamicBinDataView authData; - Protocol::MQTT::V5::VisitorVariant visitor; - while (packet.props.getProperty(visitor) && (authMethod.length == 0 || authData.length == 0)) + // Authentication need to know if we are in a CONNECT/CONNACK process since it behaves differently in that case + impl->authSource |= FromConnect | 1; + + if (impl->handleAuth() == ErrorType::Success) + { // We need to receive either a CONNACK or a AUTH packet now, so let's do that until we're done + while (true) { - if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationMethod) + // Ok, now we have a packet read it + type = impl->getLastPacketType(); + if (type == Protocol::MQTT::V5::CONNACK) { - auto view = visitor.as< DynamicStringView >(); - authMethod = *view; + ErrorType ret = impl->handleConnACK(); + if (ret != ErrorType::Success) + impl->close(); + + impl->authSource = 0; + return ret; } - else if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationData) + else if (type == Protocol::MQTT::V5::AUTH) { - auto data = visitor.as< DynamicBinDataView >(); - authData = *data; + if (ErrorType ret = impl->handleAuth()) + { // In case of authentication error, let's report back up + impl->close(); + impl->authSource = 0; + return ret; + } // Else, let's continue the Authentication dance + } + else + { + impl->close(); + impl->authSource = 0; + return Protocol::MQTT::V5::ProtocolError; } } - impl->cb->authReceived(packet.fixedVariableHeader.reason(), authMethod, authData, packet.props); - return ErrorType::Success; } } #endif @@ -1081,12 +1264,19 @@ namespace Network { namespace Client { } #if MQTTUseAuth == 1 + struct ConditionalScopedLock + { + Lock * a; + ConditionalScopedLock(Lock * a) : a(a) { if (a) a->acquire(); } + ~ConditionalScopedLock() { if (a) a->release(); } + }; + // Authenticate with the given server. MQTTv5::ErrorType MQTTv5::auth(const ReasonCodes reasonCode, const DynamicStringView & authMethod, const DynamicBinDataView & authData, Properties * properties) { - if (reasonCode != Protocol::MQTT::V5::Success || reasonCode != Protocol::MQTT::V5::ContinueAuthentication || reasonCode != Protocol::MQTT::V5::ReAuthenticate) + if (reasonCode != Protocol::MQTT::V5::Success && reasonCode != Protocol::MQTT::V5::ContinueAuthentication && reasonCode != Protocol::MQTT::V5::ReAuthenticate) return ErrorType::BadParameter; - if (!properties && (authMethod.length == 0 || authData.length == 0)) + if (!properties && (authMethod.length == 0 || authData.length == 0)) return ErrorType::BadParameter; // A auth method is required // Don't move this around, it must appear on stack until it's no more used @@ -1095,7 +1285,7 @@ namespace Network { namespace Client { Protocol::MQTT::V5::AuthPacket packet; - ScopedLock scope(impl->lock); + ConditionalScopedLock scope((impl->authSource & AuthMask) ? 0 : &impl->lock); if (!impl->isOpen()) return ErrorType::NotConnected; if (impl->getLastPacketType() != Protocol::MQTT::V5::RESERVED) return ErrorType::TranscientPacket; @@ -1106,7 +1296,7 @@ namespace Network { namespace Client { // Check if we have a auth method packet.props.append(&method); // That'll fail silently if it already exists packet.props.append(&data); // That'll fail silently if it already exists - + packet.fixedVariableHeader.reasonCode = reasonCode; // Then send the packet @@ -1116,40 +1306,22 @@ namespace Network { namespace Client { Protocol::MQTT::V5::ControlPacketType type = impl->getLastPacketType(); if (type == Protocol::MQTT::V5::AUTH) { - Protocol::MQTT::V5::ROAuthPacket packet; - int ret = impl->extractControlPacket(type, packet); - if (ret > 0) - { - // Parse the Auth packet and call the user method - // Try to find the auth method, and the auth data - DynamicStringView authMethod; - DynamicBinDataView authData; - Protocol::MQTT::V5::VisitorVariant visitor; - while (packet.props.getProperty(visitor) && (authMethod.length == 0 || authData.length == 0)) - { - if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationMethod) - { - auto view = visitor.as< DynamicStringView >(); - authMethod = *view; - } - else if (visitor.propertyType() == Protocol::MQTT::V5::AuthenticationData) - { - auto data = visitor.as< DynamicBinDataView >(); - authData = *data; - } - } - impl->cb->authReceived(packet.fixedVariableHeader.reason(), authMethod, authData, packet.props); - return ErrorType::Success; - } + impl->authSource++; + ErrorType ret = impl->handleAuth(); + impl->authSource--; + return ret; + } else if (type == Protocol::MQTT::V5::CONNACK && (impl->authSource & FromConnect) > 0) + { // We don't signal any error here, it's up to the parent's connectTo to check this packet + return ErrorType::Success; } - return Protocol::MQTT::V5::ProtocolError; + return Protocol::MQTT::V5::ProtocolError; } #endif // Subscribe to a topic. MQTTv5::ErrorType MQTTv5::subscribe(const char * _topic, const RetainHandling retainHandling, const bool withAutoFeedBack, const QoSDelivery maxAcceptedQoS, const bool retainAsPublished, Properties * properties) { - if (_topic == nullptr) + if (_topic == nullptr) return ErrorType::BadParameter; // Create the subscribe topic here @@ -1190,7 +1362,7 @@ namespace Network { namespace Client { Protocol::MQTT::V5::ROSubACKPacket rpacket; int ret = impl->extractControlPacket(type, rpacket); if (ret <= 0) return ErrorType::TranscientPacket; - + if (rpacket.fixedVariableHeader.packetID != packet.fixedVariableHeader.packetID) return ErrorType::TranscientPacket; @@ -1209,7 +1381,7 @@ namespace Network { namespace Client { return MQTTv5::ErrorType::NetworkError; } -#if MQTTUseUnsubscribe == 1 +#if MQTTUseUnsubscribe == 1 MQTTv5::ErrorType MQTTv5::unsubscribe(UnsubscribeTopic & topics, Properties * properties) { ScopedLock scope(impl->lock); @@ -1232,7 +1404,7 @@ namespace Network { namespace Client { packet.fixedVariableHeader.packetID = impl->allocatePacketID(); - impl->unsubscribeId = packet.fixedVariableHeader.packetID; + impl->unsubscribeId = packet.fixedVariableHeader.packetID; packet.payload.topics = &topics; // Then send the packet @@ -1242,7 +1414,7 @@ namespace Network { namespace Client { // Unsubscribe answer will be fetched later on, use getUnsubscribeResult if you are interested in that return ErrorType::Success; } - + MQTTv5::ErrorType MQTTv5::getUnsubscribeResult() { ScopedLock scope(impl->lock); @@ -1263,24 +1435,24 @@ namespace Network { namespace Client { uint16 packetID = ((Protocol::MQTT::V5::FixedField &)publishPacket.fixedVariableHeader).packetID; static Protocol::MQTT::V5::ControlPacketType nexts[3] = { Protocol::MQTT::V5::RESERVED, Protocol::MQTT::V5::PUBACK, Protocol::MQTT::V5::PUBREC }; - Protocol::MQTT::V5::ControlPacketType next = nexts[QoS]; + Protocol::MQTT::V5::ControlPacketType next = nexts[QoS]; /* The state machine is like this: SEND RECV - [ PUB ] => Send + [ PUB ] => Send - - - - - - - - - - - - Receive [ PKT ] + Receive [ PKT ] PKT == ACK ? PKT == ACK ? / Yes \ No (REC) / Yes \ No (REC) Assert ID Assert ID Send ACK Send REC - | | | - - - - - - - - - + | | | - - - - - - - - - Stop [ REL ] => Send Stop Receive [REL] - - - - - - - - - - | Receive [COMP] Send [COMP] - | | + | | Assert ID Stop - | - Stop + | + Stop */ if (sending) @@ -1306,15 +1478,15 @@ namespace Network { namespace Client { // Ensure it's matching the packet ID if (reply.fixedVariableHeader.packetID != packetID) // Could be a protocol error, but this will be checked in the next call to eventLoop - return ErrorType::TranscientPacket; - + return ErrorType::TranscientPacket; + // Compute the expected next packet next = Protocol::MQTT::Common::Helper::getNextPacketType(next); - if (next == Protocol::MQTT::V5::RESERVED) + if (next == Protocol::MQTT::V5::RESERVED) return ErrorType::Success; } else sending = true; - // Check if we need to send something + // Check if we need to send something Protocol::MQTT::V5::PublishReplyPacket answer(next); answer.fixedVariableHeader.packetID = packetID; next = Protocol::MQTT::Common::Helper::getNextPacketType(next); @@ -1327,7 +1499,7 @@ namespace Network { namespace Client { // Publish to a topic. MQTTv5::ErrorType MQTTv5::publish(const char * topic, const uint8 * payload, const uint32 payloadLength, const bool retain, const QoSDelivery QoS, const uint16 packetIdentifier, Properties * properties) { - if (topic == nullptr) + if (topic == nullptr) return ErrorType::BadParameter; ScopedLock scope(impl->lock); @@ -1395,7 +1567,7 @@ namespace Network { namespace Client { switch (type) { case Protocol::MQTT::V5::PINGRESP: break; // We ignore ping response - case Protocol::MQTT::V5::DISCONNECT: impl->close(); return ErrorType::NotConnected; // No work to perform upon server sending disconnect + case Protocol::MQTT::V5::DISCONNECT: impl->close(); return ErrorType::NotConnected; // No work to perform upon server sending disconnect case Protocol::MQTT::V5::PUBLISH: { Protocol::MQTT::V5::ROPublishPacket packet; @@ -1405,14 +1577,14 @@ namespace Network { namespace Client { impl->cb->messageReceived(packet.fixedVariableHeader.topicName, DynamicBinDataView(packet.payload.size, packet.payload.data), packet.fixedVariableHeader.packetID, packet.props); return enterPublishCycle(packet, false); } -#if MQTTUseUnsubscribe == 1 +#if MQTTUseUnsubscribe == 1 case Protocol::MQTT::V5::UNSUBACK: { Protocol::MQTT::V5::ROUnsubACKPacket rpacket; int ret = impl->extractControlPacket(type, rpacket); if (ret == 0) { impl->close(); return ErrorType::NotConnected; } if (ret < 0) return ErrorType::NetworkError; - + if (rpacket.fixedVariableHeader.packetID != impl->unsubscribeId) return ErrorType::NetworkError; @@ -1428,24 +1600,31 @@ namespace Network { namespace Client { impl->unsubscribeId = 0; } #endif - default: // Ignore all other packets currently +#if MQTTUseAuth == 1 + case Protocol::MQTT::V5::AUTH: + { + return impl->handleAuth(); + } +#endif + + default: // Ignore all other packets currently break; } impl->resetPacketReceivingState(); return ErrorType::Success; - } + } // Disconnect from the server MQTTv5::ErrorType MQTTv5::disconnect(const ReasonCodes code, Properties * properties) { - if (code != ReasonCodes::NormalDisconnection && code != ReasonCodes::DisconnectWithWillMessage && code < ReasonCodes::UnspecifiedError) + if (code != ReasonCodes::NormalDisconnection && code != ReasonCodes::DisconnectWithWillMessage && code < ReasonCodes::UnspecifiedError) return ErrorType::BadParameter; ScopedLock scope(impl->lock); if (!impl->isOpen()) return ErrorType::Success; - + Protocol::MQTT::V5::ControlPacket packet; packet.fixedVariableHeader.reasonCode = code; // Capture properties (to avoid copying them) diff --git a/tests/MQTTc.cpp b/tests/MQTTc.cpp index c5738d8..2196979 100644 --- a/tests/MQTTc.cpp +++ b/tests/MQTTc.cpp @@ -2,7 +2,7 @@ #include #include -// We need MQTT client +// We need MQTT client #include "Network/Clients/MQTT.hpp" // We need URL parsing too #include "Network/Address.hpp" @@ -20,14 +20,36 @@ struct InitLogger { struct MessageReceiver : public Network::Client::MessageReceived { - void messageReceived(const Network::Client::MQTTv5::DynamicStringView & topic, const Network::Client::MQTTv5::DynamicBinDataView & payload, + void messageReceived(const Network::Client::MQTTv5::DynamicStringView & topic, const Network::Client::MQTTv5::DynamicBinDataView & payload, const uint16 packetIdentifier, const Network::Client::MQTTv5::PropertiesView & properties) { fprintf(stdout, "Msg received: (%04X)\n", packetIdentifier); fprintf(stdout, " Topic: %.*s\n", topic.length, topic.data); fprintf(stdout, " Payload: %.*s\n", payload.length, payload.data); } +#if MQTTUseAuth == 1 + bool authReceived(const ReasonCodes reasonCode, const DynamicStringView & authMethod, const DynamicBinDataView & authData, const PropertiesView & properties) + { + fprintf(stdout, "Auth packet received\n"); + fprintf(stdout, " AuthMethod: %.*s\n", authMethod.length, authMethod.data); + fprintf(stdout, " AuthData: %.*s\n", authData.length, authData.data); + fprintf(stdout, " Reason Code: %d\n", (int)reasonCode); + if (authData.length != strlen("Whizz") || memcmp(authData.data, "Whizz", authData.length)) + { + fprintf(stdout, "Bad authentication answer from server"); + return false; + } + DynamicBinDataView data(strlen("Bees"), (const uint8*)"Bees"); + if (Network::Client::MQTTv5::ErrorType ret = client->auth(Protocol::MQTT::V5::ContinueAuthentication, authMethod, data)) + { + fprintf(stdout, "Failed auth with error: %d\n", (int)ret); + return false; + } + return true; + } + Network::Client::MQTTv5 * client; +#endif }; String publishTopic, publishMessage; @@ -44,7 +66,7 @@ String setQoS(const String & qos) if (qos == "0" || qos.caselessEqual("atmostone")) QoS = Network::Client::MQTTv5::QoSDelivery::AtMostOne; else if (qos == "1" || qos.caselessEqual("atleastone")) QoS = Network::Client::MQTTv5::QoSDelivery::AtLeastOne; else if (qos == "2" || qos.caselessEqual("exactlyone")) QoS = Network::Client::MQTTv5::QoSDelivery::ExactlyOne; - else + else { return "Please specify either 0 or atleastone, 1 or atmostone, 2 or exactlyone for QoS option"; } @@ -84,17 +106,17 @@ String readFile(const String & path) int main(int argc, const char ** argv) { - + String server; String username; String password; String clientID; String subscribe; String certFile; - unsigned keepAlive = 300; + unsigned keepAlive = 300; bool dumpComm = false; bool retainPublishedMessage = false; - + Arguments::declare(server, "The server URL (for example 'mqtt.mine.com:1883')", "server"); Arguments::declare(username, "The username to use", "username"); @@ -108,9 +130,9 @@ int main(int argc, const char ** argv) Arguments::declare(certFile, "Expected broker certificate in DER format", "der"); Arguments::declare(dumpComm, "Dump communication", "verbose"); - + String error = Arguments::parse(argc, argv); - if (error) + if (error) { fprintf(stderr, "%s\n", (const char*)error); return argc != 1; @@ -126,7 +148,7 @@ int main(int argc, const char ** argv) Network::Address::URL serverURL(server); uint16 port = serverURL.stripPortFromAuthority(1883); MessageReceiver receiver; - + #if MQTTUseTLS == 1 Protocol::MQTT::Common::DynamicBinaryData brokerCert; if (certFile) @@ -142,10 +164,23 @@ int main(int argc, const char ** argv) #endif Network::Client::MQTTv5::DynamicBinDataView pw(password.getLength(), (const uint8*)password); - if (Network::Client::MQTTv5::ErrorType ret = client.connectTo(serverURL.getAuthority(), port, serverURL.getScheme().caselessEqual("mqtts"), +#if MQTTUseAuth == 1 + receiver.client = &client; + Protocol::MQTT::V5::Property method(Protocol::MQTT::V5::AuthenticationMethod, Network::Client::MQTTv5::DynamicStringView("DumbledoreOffice")); + Protocol::MQTT::V5::Property data(Protocol::MQTT::V5::AuthenticationData, Network::Client::MQTTv5::DynamicBinDataView(strlen("Fizz"), (const uint8*)"Fizz")); + Protocol::MQTT::V5::Properties props; + + props.append(&method); + props.append(&data); + + if (Network::Client::MQTTv5::ErrorType ret = client.connectTo(serverURL.getAuthority(), port, serverURL.getScheme().caselessEqual("mqtts"), + (uint16)min(65535U, keepAlive), true, username ? (const char*)username : nullptr, password ? &pw : nullptr, nullptr, QoS, false, &props)) +#else + if (Network::Client::MQTTv5::ErrorType ret = client.connectTo(serverURL.getAuthority(), port, serverURL.getScheme().caselessEqual("mqtts"), (uint16)min(65535U, keepAlive), true, username ? (const char*)username : nullptr, password ? &pw : nullptr)) +#endif { - return fprintf(stderr, "Failed connection to %s with error: %d\n", (const char*)serverURL.asText(), (int)ret); + return fprintf(stderr, "Failed connection to %s with error: %d\n", (const char*)serverURL.asText(), (int)ret); } printf("Connected to %s\n", (const char*)serverURL.asText()); @@ -168,7 +203,7 @@ int main(int argc, const char ** argv) } #if MQTTUseUnsubscribe == 1 - // Unsubscribe from the topic here + // Unsubscribe from the topic here Protocol::MQTT::V5::UnsubscribeTopic topic((const char*)subscribe, true); if (Network::Client::MQTTv5::ErrorType ret = client.unsubscribe(topic, 0)) { @@ -177,7 +212,7 @@ int main(int argc, const char ** argv) // Run the event loop once more to fetch the unsubscribe ACK (not absolutely required when leaving, but for sample code if (Network::Client::MQTTv5::ErrorType ret = client.eventLoop()) return fprintf(stderr, "Event loop failed with error: %d\n", (int)ret); - + Network::Client::MQTTv5::ErrorType ret = client.getUnsubscribeResult(); fprintf(ret == 0 ? stdout : stderr, "Unsubscribe result: %d\n", (int)ret); #endif