From 1e2cd7ab526b1e27dad8ec47009724a7b7dc82a4 Mon Sep 17 00:00:00 2001 From: Dakshit Babbar Date: Tue, 10 Sep 2024 17:04:59 +0530 Subject: [PATCH] Add an intermediate connection state to handle disconnects due to network failure --- source/core_mqtt.c | 253 +++++++++++++------------- source/core_mqtt_serializer.c | 24 ++- source/include/core_mqtt.h | 10 +- source/include/core_mqtt_serializer.h | 7 +- 4 files changed, 156 insertions(+), 138 deletions(-) diff --git a/source/core_mqtt.c b/source/core_mqtt.c index 096f29f9e..b3342d21e 100644 --- a/source/core_mqtt.c +++ b/source/core_mqtt.c @@ -266,7 +266,7 @@ static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ); * * @return Number of bytes received, or negative number on network error. */ -static int32_t recvExact( const MQTTContext_t * pContext, +static int32_t recvExact( MQTTContext_t * pContext, size_t bytesToRecv ); /** @@ -278,7 +278,7 @@ static int32_t recvExact( const MQTTContext_t * pContext, * * @return #MQTTRecvFailed or #MQTTNoDataAvailable. */ -static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, +static MQTTStatus_t discardPacket( MQTTContext_t * pContext, size_t remainingLength, uint32_t timeoutMs ); @@ -302,7 +302,7 @@ static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext, * * @return #MQTTSuccess or #MQTTRecvFailed. */ -static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, +static MQTTStatus_t receivePacket( MQTTContext_t * pContext, MQTTPacketInfo_t incomingPacket, uint32_t remainingTimeMs ); @@ -424,25 +424,21 @@ static MQTTStatus_t validateSubscribeUnsubscribeParams( const MQTTContext_t * pC * ##MQTTRecvFailed if transport recv failed; * #MQTTSuccess otherwise. */ -static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, +static MQTTStatus_t receiveConnack( MQTTContext_t * pContext, uint32_t timeoutMs, bool cleanSession, MQTTPacketInfo_t * pIncomingPacket, bool * pSessionPresent ); /** - * @brief Resends pending acks for a re-established MQTT session, or - * clears existing state records for a clean session. + * @brief Resends pending acks for a re-established MQTT session * * @param[in] pContext Initialized MQTT context. - * @param[in] sessionPresent Session present flag received from the MQTT broker. * * @return #MQTTSendFailed if transport send during resend failed; * #MQTTSuccess otherwise. */ -static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext, - bool sessionPresent ); - +static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext) /** * @brief Send the publish packet without copying the topic string and payload in @@ -823,6 +819,11 @@ static int32_t sendMessageVector( MQTTContext_t * pContext, { bytesSentOrError = sendResult; LogError( ( "sendMessageVector: Unable to send packet: Network Error." ) ); + + if(pContext->connectStatus == MQTTConnected) + { + pContext->connectStatus = MQTTDisconnectPending; + } } else { @@ -902,6 +903,11 @@ static int32_t sendBuffer( MQTTContext_t * pContext, { bytesSentOrError = sendResult; LogError( ( "sendBuffer: Unable to send packet: Network Error." ) ); + + if(pContext->connectStatus == MQTTConnected) + { + pContext->connectStatus = MQTTDisconnectPending; + } } else { @@ -962,7 +968,7 @@ static MQTTPubAckType_t getAckFromPacketType( uint8_t packetType ) /*-----------------------------------------------------------*/ -static int32_t recvExact( const MQTTContext_t * pContext, +static int32_t recvExact( MQTTContext_t * pContext, size_t bytesToRecv ) { uint8_t * pIndex = NULL; @@ -998,6 +1004,15 @@ static int32_t recvExact( const MQTTContext_t * pContext, ( long int ) bytesRecvd ) ); totalBytesRecvd = bytesRecvd; receiveError = true; + + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + if(pContext->connectStatus == MQTTConnected) + { + pContext->connectStatus = MQTTDisconnectPending; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } else if( bytesRecvd > 0 ) { @@ -1039,7 +1054,7 @@ static int32_t recvExact( const MQTTContext_t * pContext, /*-----------------------------------------------------------*/ -static MQTTStatus_t discardPacket( const MQTTContext_t * pContext, +static MQTTStatus_t discardPacket( MQTTContext_t * pContext, size_t remainingLength, uint32_t timeoutMs ) { @@ -1175,7 +1190,7 @@ static MQTTStatus_t discardStoredPacket( MQTTContext_t * pContext, /*-----------------------------------------------------------*/ -static MQTTStatus_t receivePacket( const MQTTContext_t * pContext, +static MQTTStatus_t receivePacket( MQTTContext_t * pContext, MQTTPacketInfo_t incomingPacket, uint32_t remainingTimeMs ) { @@ -1272,6 +1287,7 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, uint8_t packetTypeByte = 0U; MQTTPubAckType_t packetType; MQTTFixedBuffer_t localBuffer; + MQTTConnectionStatus_t connectStatus; bool stateUpdateHookExecuted = false; uint8_t pubAckPacket[ MQTT_PUBLISH_ACK_PACKET_SIZE ]; @@ -1296,28 +1312,26 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, stateUpdateHookExecuted = true; - if( pContext->connectStatus != MQTTConnected) + connectStatus=pContext->connectStatus; + + if( connectStatus != MQTTConnected) { - status = MQTTDisconnected; + status = (connectStatus==MQTTNotConnected) ? MQTTStatusNotConnected: MQTTStatusDisconnectPending; } } if( status == MQTTSuccess ) { - // MQTT_PRE_SEND_HOOK( pContext ); - /* Here, we are not using the vector approach for efficiency. There is just one buffer * to be sent which can be achieved with a normal send call. */ sendResult = sendBuffer( pContext, localBuffer.pBuffer, MQTT_PUBLISH_ACK_PACKET_SIZE ); - if( sendResult != ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) + if( sendResult < ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) { status = MQTTSendFailed; } - - // MQTT_POST_SEND_HOOK( pContext ); } if( stateUpdateHookExecuted == true ) @@ -1326,7 +1340,6 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, MQTT_POST_STATE_UPDATE_HOOK( pContext ); } - // if( sendResult == ( int32_t ) MQTT_PUBLISH_ACK_PACKET_SIZE ) if( status == MQTTSuccess ) { pContext->controlPacketSent = true; @@ -1353,7 +1366,6 @@ static MQTTStatus_t sendPublishAcks( MQTTContext_t * pContext, "PacketSize=%lu.", ( unsigned int ) packetTypeByte, ( long int ) sendResult, MQTT_PUBLISH_ACK_PACKET_SIZE ) ); - // status = MQTTSendFailed; } } @@ -1713,6 +1725,15 @@ static MQTTStatus_t receiveSingleIteration( MQTTContext_t * pContext, { /* The receive function has failed. Bubble up the error up to the user. */ status = MQTTRecvFailed; + + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + if(pContext->connectStatus == MQTTConnected) + { + pContext->connectStatus = MQTTDisconnectPending; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } else if( ( recvBytes == 0 ) && ( pContext->index == 0U ) ) { @@ -2334,7 +2355,7 @@ static MQTTStatus_t sendConnectWithoutCopy( MQTTContext_t * pContext, /*-----------------------------------------------------------*/ -static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, +static MQTTStatus_t receiveConnack( MQTTContext_t * pContext, uint32_t timeoutMs, bool cleanSession, MQTTPacketInfo_t * pIncomingPacket, @@ -2365,6 +2386,24 @@ static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, pContext->transportInterface.pNetworkContext, pIncomingPacket ); + if( status == MQTTStatusDisconnectPending ) + { + /* Convert this status to MQTTRecvFailed as MQTTStatusDisconnectPending is + * reserved for cases where we need to let the user know about the MQTT + * connection status. + */ + status = MQTTRecvFailed; + + MQTT_PRE_STATE_UPDATE_HOOK( pContext ); + + if(pContext->connectStatus == MQTTConnected) + { + pContext->connectStatus = MQTTDisconnectPending; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); + } + /* The loop times out based on 2 conditions. * 1. If timeoutMs is greater than 0: * Loop times out based on the timeout calculated by getTime() @@ -2458,55 +2497,6 @@ static MQTTStatus_t receiveConnack( const MQTTContext_t * pContext, /*-----------------------------------------------------------*/ -static MQTTStatus_t handleSessionResumption( MQTTContext_t * pContext, - bool sessionPresent ) -{ - MQTTStatus_t status = MQTTSuccess; - MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER; - uint16_t packetId = MQTT_PACKET_ID_INVALID; - MQTTPublishState_t state = MQTTStateNull; - - assert( pContext != NULL ); - - /* Reset the index and clear the buffer when a new session is established. */ - pContext->index = 0; - ( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size ); - - if( sessionPresent == true ) - { - /* Get the next packet ID for which a PUBREL need to be resent. */ - packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); - - /* Resend all the PUBREL acks after session is reestablished. */ - while( ( packetId != MQTT_PACKET_ID_INVALID ) && - ( status == MQTTSuccess ) ) - { - status = sendPublishAcks( pContext, packetId, state ); - - packetId = MQTT_PubrelToResend( pContext, &cursor, &state ); - } - } - else - { - /* Clear any existing records if a new session is established. */ - if( pContext->outgoingPublishRecordMaxCount > 0U ) - { - ( void ) memset( pContext->outgoingPublishRecords, - 0x00, - pContext->outgoingPublishRecordMaxCount * sizeof( *pContext->outgoingPublishRecords ) ); - } - - if( pContext->incomingPublishRecordMaxCount > 0U ) - { - ( void ) memset( pContext->incomingPublishRecords, - 0x00, - pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) ); - } - } - - return status; -} - static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext) { MQTTStatus_t status = MQTTSuccess; @@ -2727,7 +2717,7 @@ MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext, MQTTStatus_t MQTT_CheckConnectStatus(MQTTContext_t * pContext) { - bool isConnected; + MQTTConnectionStatus_t connectStatus; MQTTStatus_t status = MQTTSuccess; if( pContext == NULL ) @@ -2741,11 +2731,27 @@ MQTTStatus_t MQTT_CheckConnectStatus(MQTTContext_t * pContext) { MQTT_PRE_STATE_UPDATE_HOOK( pContext ); - isConnected = (pContext->connectStatus == MQTTConnected); + connectStatus = pContext->connectStatus == MQTTConnected; - MQTT_POST_STATE_UPDATE_HOOK( pContext ); + switch (connectStatus) + { + case MQTTConnected: + status = MQTTStatusConnected; + break; + + case MQTTNotConnected: + status = MQTTStatusNotConnected; + break; - status = isConnected? MQTTAlreadyConnected : MQTTDisconnected; + case MQTTDisconnectPending: + status = MQTTStatusDisconnectPending; + break; + + default: + break; + } + + MQTT_POST_STATE_UPDATE_HOOK( pContext ); } return status; @@ -2762,6 +2768,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, size_t remainingLength = 0UL, packetSize = 0UL; MQTTStatus_t status = MQTTSuccess; MQTTPacketInfo_t incomingPacket = { 0 }; + MQTTConnectionStatus_t connectStatus; bool stateUpdateHookExecuted = false; incomingPacket.type = ( uint8_t ) 0; @@ -2794,22 +2801,20 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, stateUpdateHookExecuted = true; - if( pContext->connectStatus == MQTTConnected) + connectStatus=pContext->connectStatus; + + if( connectStatus != MQTTNotConnected) { - status = MQTTAlreadyConnected; + status = (connectStatus==MQTTConnected) ? MQTTStatusConnected: MQTTStatusDisconnectPending; } } if( status == MQTTSuccess ) { - // MQTT_PRE_SEND_HOOK( pContext ); - status = sendConnectWithoutCopy( pContext, pConnectInfo, pWillInfo, remainingLength ); - - // MQTT_POST_SEND_HOOK( pContext ); } /* Read CONNACK from transport layer. */ @@ -2862,7 +2867,6 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, if( status == MQTTSuccess && *pSessionPresent == true ) { /* Resend PUBRELs when reestablishing a session */ - // status = handleSessionResumption( pContext, *pSessionPresent ); status = handleUncleanSessionResumption( pContext); } @@ -2870,9 +2874,9 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext, { LogInfo( ( "MQTT connection established with the broker." ) ); } - else if( status == MQTTAlreadyConnected ) + else if( status == MQTTStatusConnected || status == MQTTStatusDisconnectPending) { - LogInfo( ( "MQTT connection already established, return status = %s.", + LogInfo( ( "MQTT Connection is either already established or a disconnect is pending, return status = %s.", MQTT_Status_strerror( status ) ) ); } else @@ -2901,6 +2905,7 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, uint16_t packetId ) { bool stateUpdateHookExecuted = false; + MQTTConnectionStatus_t connectStatus; size_t remainingLength = 0UL, packetSize = 0UL; /* Validate arguments. */ @@ -2927,24 +2932,22 @@ MQTTStatus_t MQTT_Subscribe( MQTTContext_t * pContext, stateUpdateHookExecuted = true; - if( pContext->connectStatus != MQTTConnected) + connectStatus=pContext->connectStatus; + + if( connectStatus != MQTTConnected) { - status = MQTTDisconnected; + status = (connectStatus==MQTTNotConnected) ? MQTTStatusNotConnected: MQTTStatusDisconnectPending; } } if( status == MQTTSuccess ) { - // MQTT_PRE_SEND_HOOK( pContext ); - /* Send MQTT SUBSCRIBE packet. */ status = sendSubscribeWithoutCopy( pContext, pSubscriptionList, subscriptionCount, packetId, remainingLength ); - - // MQTT_POST_SEND_HOOK( pContext ); } if( stateUpdateHookExecuted == true ) @@ -2966,6 +2969,7 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, size_t remainingLength = 0UL; size_t packetSize = 0UL; MQTTPublishState_t publishStatus = MQTTStateNull; + MQTTConnectionStatus_t connectStatus; bool stateUpdateHookExecuted = false; /* Maximum number of bytes required by the 'fixed' part of the PUBLISH @@ -3002,22 +3006,23 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, if(status == MQTTSuccess ) { + /* Take the mutex as multiple send calls are required for sending this + * packet. */ MQTT_PRE_STATE_UPDATE_HOOK( pContext ); stateUpdateHookExecuted = true; - if( pContext->connectStatus != MQTTConnected) + connectStatus=pContext->connectStatus; + + if( connectStatus != MQTTConnected) { - status = MQTTDisconnected; + status = (connectStatus==MQTTNotConnected) ? MQTTStatusNotConnected: MQTTStatusDisconnectPending; } } if( ( status == MQTTSuccess ) && ( pPublishInfo->qos > MQTTQoS0 ) ) { - // MQTT_PRE_STATE_UPDATE_HOOK( pContext ); - /* Set the flag so that the corresponding hook can be called later. */ - // stateUpdateHookExecuted = true; status = MQTT_ReserveState( pContext, packetId, @@ -3034,18 +3039,11 @@ MQTTStatus_t MQTT_Publish( MQTTContext_t * pContext, if( status == MQTTSuccess ) { - /* Take the mutex as multiple send calls are required for sending this - * packet. */ - // MQTT_PRE_SEND_HOOK( pContext ); - status = sendPublishWithoutCopy( pContext, pPublishInfo, mqttHeader, headerSize, packetId ); - - /* Give the mutex away for the next taker. */ - // MQTT_POST_SEND_HOOK( pContext ); } if( ( status == MQTTSuccess ) && @@ -3100,6 +3098,7 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) /* MQTT ping packets are of fixed length. */ uint8_t pingreqPacket[ 2U ]; MQTTFixedBuffer_t localBuffer; + MQTTConnectionStatus_t connectStatus; bool stateUpdateHookExecuted = false; localBuffer.pBuffer = pingreqPacket; @@ -3143,16 +3142,16 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) stateUpdateHookExecuted = true; - if( pContext->connectStatus != MQTTConnected) + connectStatus=pContext->connectStatus; + + if( connectStatus != MQTTConnected) { - status = MQTTDisconnected; + status = (connectStatus==MQTTNotConnected) ? MQTTStatusNotConnected: MQTTStatusDisconnectPending; } } if( status == MQTTSuccess ) { - // MQTT_PRE_SEND_HOOK( pContext ); - /* Send the serialized PINGREQ packet to transport layer. * Here, we do not use the vectored IO approach for efficiency as the * Ping packet does not have numerous fields which need to be copied @@ -3161,9 +3160,6 @@ MQTTStatus_t MQTT_Ping( MQTTContext_t * pContext ) localBuffer.pBuffer, packetSize ); - /* Give the mutex away. */ - // MQTT_POST_SEND_HOOK( pContext ); - /* It is an error to not send the entire PINGREQ packet. */ if( sendResult < ( int32_t ) packetSize ) { @@ -3196,6 +3192,7 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, uint16_t packetId ) { bool stateUpdateHookExecuted = false; + MQTTConnectionStatus_t connectStatus; size_t remainingLength = 0UL, packetSize = 0UL; /* Validate arguments. */ @@ -3223,25 +3220,21 @@ MQTTStatus_t MQTT_Unsubscribe( MQTTContext_t * pContext, stateUpdateHookExecuted = true; - if( pContext->connectStatus != MQTTConnected) + connectStatus=pContext->connectStatus; + + if( connectStatus != MQTTConnected) { - status = MQTTDisconnected; + status = (connectStatus==MQTTNotConnected) ? MQTTStatusNotConnected: MQTTStatusDisconnectPending; } } if( status == MQTTSuccess ) { - - // MQTT_PRE_SEND_HOOK( pContext ); - status = sendUnsubscribeWithoutCopy( pContext, pSubscriptionList, subscriptionCount, packetId, remainingLength ); - - /* Give the mutex away. */ - // MQTT_POST_SEND_HOOK( pContext ); } if( stateUpdateHookExecuted == true ) @@ -3262,6 +3255,7 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) MQTTStatus_t status = MQTTSuccess; MQTTFixedBuffer_t localBuffer; uint8_t disconnectPacket[ 2U ]; + MQTTConnectionStatus_t connectStatus; bool stateUpdateHookExecuted = false; localBuffer.pBuffer = disconnectPacket; @@ -3295,9 +3289,11 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) stateUpdateHookExecuted = true; - if( pContext->connectStatus != MQTTConnected) + connectStatus=pContext->connectStatus; + + if( connectStatus == MQTTNotConnected) { - status = MQTTDisconnected; + status = MQTTStatusNotConnected; } } @@ -3312,8 +3308,6 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) LogError( ( "MQTT Connection Disconnected Successfuly" ) ); - // MQTT_PRE_SEND_HOOK( pContext ); - /* Here we do not use vectors as the disconnect packet has fixed fields * which do not reside in user provided buffers. Thus, it can be sent * using a simple send call. */ @@ -3321,9 +3315,6 @@ MQTTStatus_t MQTT_Disconnect( MQTTContext_t * pContext ) localBuffer.pBuffer, packetSize ); - /* Give the mutex away. */ - // MQTT_POST_SEND_HOOK( pContext ); - if( sendResult < ( int32_t ) packetSize ) { LogError( ( "Transport send failed for DISCONNECT packet." ) ); @@ -3611,12 +3602,16 @@ const char * MQTT_Status_strerror( MQTTStatus_t status ) str = "MQTTNeedMoreBytes"; break; - case MQTTAlreadyConnected: - str = "MQTTAlreadyConnected"; + case MQTTStatusConnected: + str = "MQTTStatusConnected"; + break; + + case MQTTStatusNotConnected: + str = "MQTTStatusNotConnected"; break; - case MQTTDisconnected: - str = "MQTTDisconnected"; + case MQTTStatusDisconnectPending: + str = "MQTTStatusDisconnectPending"; break; default: diff --git a/source/core_mqtt_serializer.c b/source/core_mqtt_serializer.c index 97022034c..ab6ac3d03 100644 --- a/source/core_mqtt_serializer.c +++ b/source/core_mqtt_serializer.c @@ -820,20 +820,24 @@ static size_t getRemainingLength( TransportRecv_t recvFunc, multiplier *= 128U; bytesDecoded++; } + else if( bytesReceived < 0) + { + remainingLength = -1; + } else { remainingLength = MQTT_REMAINING_LENGTH_INVALID; } } - if( remainingLength == MQTT_REMAINING_LENGTH_INVALID ) + if( remainingLength == MQTT_REMAINING_LENGTH_INVALID || remainingLength < 0) { break; } } while( ( encodedByte & 0x80U ) != 0U ); /* Check that the decoded remaining length conforms to the MQTT specification. */ - if( remainingLength != MQTT_REMAINING_LENGTH_INVALID ) + if( (remainingLength != MQTT_REMAINING_LENGTH_INVALID) && (remainingLength >=0) ) { expectedSize = remainingLengthEncodedSize( remainingLength ); @@ -2590,6 +2594,16 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc, { LogError( ( "Incoming packet remaining length invalid." ) ); status = MQTTBadResponse; + } + else if( pIncomingPacket->remainingLength < 0) + { + /* MQTT Connection status cannot be updated here hence bubble up + * MQTTStatusDisconnectPending status to the calling API that can update it. */ + status = MQTTStatusDisconnectPending; + } + else + { + /* Empty else MISRA 15.7 */ } } else @@ -2603,6 +2617,12 @@ MQTTStatus_t MQTT_GetIncomingPacketTypeAndLength( TransportRecv_t readFunc, { status = MQTTNoDataAvailable; } + else if( ( status != MQTTBadParameter ) && ( bytesReceived < 0 ) ) + { + /* MQTT Connection status cannot be updated here hence bubble up + * MQTTStatusDisconnectPending status to the calling API that can update it. */ + status = MQTTStatusDisconnectPending; + } /* If the input packet was valid, then any other number of bytes received is * a failure. */ diff --git a/source/include/core_mqtt.h b/source/include/core_mqtt.h index 9e5821b6c..4f8e72236 100644 --- a/source/include/core_mqtt.h +++ b/source/include/core_mqtt.h @@ -107,8 +107,9 @@ typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext, */ typedef enum MQTTConnectionStatus { - MQTTNotConnected, /**< @brief MQTT Connection is inactive. */ - MQTTConnected /**< @brief MQTT Connection is active. */ + MQTTNotConnected, /**< @brief MQTT Connection is inactive. */ + MQTTConnected, /**< @brief MQTT Connection is active. */ + MQTTDisconnectPending /**< @brief MQTT Connection needs to be disconnected as a transport error has occured. */ } MQTTConnectionStatus_t; /** @@ -420,8 +421,9 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext, * @param[in] pContext Initialized MQTT context. * * @return #MQTTBadParameter if invalid parameters are passed; - * #MQTTAlreadyConnected if the MQTT connection is established with the broker. - * #MQTTDisconnected otherwise + * #MQTTStatusConnected if the MQTT connection is established with the broker. + * #MQTTSatusNotConnected if the MQTT connection is broker. + * #MQTTSatusDisconnectPending if Transport Interface has failed and MQTT connection needs to be closed. * * Example * @code{c} diff --git a/source/include/core_mqtt_serializer.h b/source/include/core_mqtt_serializer.h index a0ef10da0..86b3c6396 100644 --- a/source/include/core_mqtt_serializer.h +++ b/source/include/core_mqtt_serializer.h @@ -96,11 +96,12 @@ typedef enum MQTTStatus MQTTIllegalState, /**< An illegal state in the state record. */ MQTTStateCollision, /**< A collision with an existing state record entry. */ MQTTKeepAliveTimeout, /**< Timeout while waiting for PINGRESP. */ - MQTTNeedMoreBytes, /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received + MQTTNeedMoreBytes, /**< MQTT_ProcessLoop/MQTT_ReceiveLoop has received incomplete data; it should be called again (probably after a delay). */ - MQTTAlreadyConnected, /**< MQTT Connection is established with the broker */ - MQTTDisconnected /**< MQTT connection is not established with the broker */ + MQTTStatusConnected, /**< MQTT connection is established with the broker */ + MQTTStatusNotConnected, /**< MQTT connection is not established with the broker */ + MQTTStatusDisconnectPending /**< Transport Interface has failed and MQTT connection needs to be closed */ } MQTTStatus_t; /**