Skip to content

Commit

Permalink
Enabling Unclean Session Publish Re-Transmits (#308)
Browse files Browse the repository at this point in the history
<!--- Title -->

Description
-----------
<!--- Describe your changes in detail. -->
This PR enables the coreMQTT library to resend unacked publishes on an
unclean session connection.

Following is a brief summary of changes:
1. Add a new API `MQTT_InitRetransmits` that will initialise the context
to handle publish retransmits on an unclean session connection
2. Add signatures of callback function pointers that users will define
in order to:
     a. copy and store outgoing publishes
b. retrieve copied publish on an unclean session connection to resend
     c. clear a copied publish when a `PUBACK`/`PUBREC` is received
     d. clear all copied publishes on a clean session connection
3. Update the API's to check if callback's are defined and implement
resend publishes as required.

Following are the specifics of the changes:
1. Add 3 new MQTTStatus_t values: MQTTPublishStoreFailed,
MQTTPublishRetrieveFailed and MQTTPublishClearAllFailed
2. Update `MQTTContext_t` to hold the callback function pointers
        a. `MQTTRetransmitStorePacket storeFunction`
        b. `MQTTRetransmitRetrievePacket retrieveFunction`
        c. `MQTTRetransmitClearPacket clearFunction`
        d. `MQTTRetransmitClearAllPackets clearAllFunction`
3. Update the `MQTT_Status_strerror` function to handle the new
`MQTTStatus_t` values
4. Add a new API function `MQTT_InitRetransmits` that will initialise
the new callback functions in the `MQTTContext_t`
5. Add this API to the core_mqtt.h file to make it available to users
6. Modify `MQTT_Publish`
a. copy the outgoing publish packet in form of an array of
`TransportOutVector_t` if the callback if defined
        b. if copy fails then bubble up  corresponding error status code
7. Modify `MQTT_ReceiveLoop`
a. on receiving a `PUBACK`/`PUBREC` clear the copy of that particular
publish after the state of the publish record has been successfully
updated, if the callback if defined
8. Modify `MQTT_Connect`
a. on a clean session clear all the copies of publishes stored if the
callback is defined
b. if clear all fails then bubble up corresponding error status code
c. on an unclean session get the packetID of the unacked publishes and
retrieve the copies of those if the callback is defined
d. if retrieve fails then bubble up corresponding error status code

Approaches Taken
---------------
- To let user know about the changes we have made we will add them to a
changelog and have a minor version bump
- To be in line with the zero copy principle in our library we chose to
provide and retrieve the publish packets for storing and resending in
form of an array of `TransportOutVector_t`
- Code is written in a way that on receiving a `PUBACK`/`PUBREC` the
copy will be cleared after the state of the publish record is changed so
that if state update fails the copy won't be cleared. Otherwise if the
state does not change and the copy is cleared then when a connection is
made with an unclean session there will be a retrieve fail as the system
is in an inconsistent state.
- We are storing the copies of the publishes with the Duplicate flag set
this is because on retrieving the packet we will get it in the form of a
`TransportOutVector_t` that holds the data in a `const` pointer which
cannot be changed after retrieving.

Pending Tasks
---------------
- [ ] Changelog
- [ ] Minor version bump
- [x] Doxygen example for the new API
- [x] Better API Names
- [x] Unit Test Updates
- [x] CBMC Proof

---------

Co-authored-by: Dakshit Babbar <dakshba@amazon.com>
Co-authored-by: GitHub Action <action@github.com>
Co-authored-by: AniruddhaKanhere <60444055+AniruddhaKanhere@users.noreply.github.com>
  • Loading branch information
4 people authored Oct 25, 2024
1 parent 86fc7d1 commit f1827d8
Show file tree
Hide file tree
Showing 8 changed files with 1,391 additions and 77 deletions.
2 changes: 2 additions & 0 deletions .github/.cSpellWords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ DLIBRARY
DNDEBUG
DUNITTEST
DUNITY
getbytesinmqttvec
getpacketid
isystem
lcov
Expand All @@ -34,6 +35,7 @@ NONDET
pylint
pytest
pyyaml
serializemqttvec
sinclude
UNACKED
unpadded
Expand Down
12 changes: 6 additions & 6 deletions docs/doxygen/include/size_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
</tr>
<tr>
<td>core_mqtt.c</td>
<td><center>4.4K</center></td>
<td><center>3.8K</center></td>
<td><center>4.9K</center></td>
<td><center>4.2K</center></td>
</tr>
<tr>
<td>core_mqtt_state.c</td>
Expand All @@ -19,12 +19,12 @@
</tr>
<tr>
<td>core_mqtt_serializer.c</td>
<td><center>2.8K</center></td>
<td><center>2.2K</center></td>
<td><center>2.9K</center></td>
<td><center>2.3K</center></td>
</tr>
<tr>
<td><b>Total estimates</b></td>
<td><b><center>8.9K</center></b></td>
<td><b><center>7.3K</center></b></td>
<td><b><center>9.5K</center></b></td>
<td><b><center>7.8K</center></b></td>
</tr>
</table>
207 changes: 200 additions & 7 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
*/
#define CORE_MQTT_UNSUBSCRIBE_PER_TOPIC_VECTOR_LENGTH ( 2U )

struct MQTTVec
{
TransportOutVector_t * pVector; /**< Pointer to transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
size_t vectorLen; /**< Length of the transport vector. USER SHOULD NOT ACCESS THIS DIRECTLY - IT IS AN INTERNAL DETAIL AND CAN CHANGE. */
};

/*-----------------------------------------------------------*/

/**
Expand Down Expand Up @@ -444,8 +450,10 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext );
* @brief Clears existing state records for a clean session.
*
* @param[in] pContext Initialized MQTT context.
*
* @return #MQTTSuccess always otherwise.
*/
static void handleCleanSession( MQTTContext_t * pContext );
static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext );

/**
* @brief Send the publish packet without copying the topic string and payload in
Expand All @@ -463,7 +471,7 @@ static void handleCleanSession( MQTTContext_t * pContext );
*/
static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
const uint8_t * pMqttHeader,
uint8_t * pMqttHeader,
size_t headerSize,
uint16_t packetId );

Expand Down Expand Up @@ -1597,6 +1605,15 @@ static MQTTStatus_t handlePublishAcks( MQTTContext_t * pContext,
}
}

if( ( ackType == MQTTPuback ) || ( ackType == MQTTPubrec ) )
{
if( ( status == MQTTSuccess ) &&
( pContext->clearFunction != NULL ) )
{
pContext->clearFunction( pContext, packetIdentifier );
}
}

if( status == MQTTSuccess )
{
/* Set fields of deserialized struct. */
Expand Down Expand Up @@ -2133,13 +2150,14 @@ static MQTTStatus_t sendUnsubscribeWithoutCopy( MQTTContext_t * pContext,

static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
const MQTTPublishInfo_t * pPublishInfo,
const uint8_t * pMqttHeader,
uint8_t * pMqttHeader,
size_t headerSize,
uint16_t packetId )
{
MQTTStatus_t status = MQTTSuccess;
size_t ioVectorLength;
size_t totalMessageLength;
bool dupFlagChanged = false;

/* Bytes required to encode the packet ID in an MQTT header according to
* the MQTT specification. */
Expand Down Expand Up @@ -2190,7 +2208,42 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,
totalMessageLength += pPublishInfo->payloadLength;
}

if( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength )
/* If not already set, set the dup flag before storing a copy of the publish
* this is because on retrieving back this copy we will get it in the form of an
* array of TransportOutVector_t that holds the data in a const pointer which cannot be
* changed after retrieving. */
if( pPublishInfo->dup != true )
{
MQTT_UpdateDuplicatePublishFlag( pMqttHeader, true );

dupFlagChanged = true;
}

/* store a copy of the publish for retransmission purposes */
if( ( pPublishInfo->qos > MQTTQoS0 ) &&
( pContext->storeFunction != NULL ) )
{
MQTTVec_t mqttVec;

mqttVec.pVector = pIoVector;
mqttVec.vectorLen = ioVectorLength;

if( pContext->storeFunction( pContext, packetId, &mqttVec ) != true )
{
status = MQTTPublishStoreFailed;
}
}

/* change the value of the dup flag to its original, if it was changed */
if( dupFlagChanged )
{
MQTT_UpdateDuplicatePublishFlag( pMqttHeader, false );

dupFlagChanged = false;
}

if( ( status == MQTTSuccess ) &&
( sendMessageVector( pContext, pIoVector, ioVectorLength ) != ( int32_t ) totalMessageLength ) )
{
status = MQTTSendFailed;
}
Expand Down Expand Up @@ -2477,6 +2530,8 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;
MQTTPublishState_t state = MQTTStateNull;
size_t totalMessageLength;
uint8_t * pMqttPacket;

assert( pContext != NULL );

Expand All @@ -2492,17 +2547,71 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
packetId = MQTT_PubrelToResend( pContext, &cursor, &state );
}

if( ( status == MQTTSuccess ) &&
( pContext->retrieveFunction != NULL ) )
{
cursor = MQTT_STATE_CURSOR_INITIALIZER;

/* Resend all the PUBLISH for which PUBACK/PUBREC is not received
* after session is reestablished. */
do
{
packetId = MQTT_PublishToResend( pContext, &cursor );

if( packetId != MQTT_PACKET_ID_INVALID )
{
if( pContext->retrieveFunction( pContext, packetId, &pMqttPacket, &totalMessageLength ) != true )
{
status = MQTTPublishRetrieveFailed;
break;
}

MQTT_PRE_STATE_UPDATE_HOOK( pContext );

if( sendBuffer( pContext, pMqttPacket, totalMessageLength ) != ( int32_t ) totalMessageLength )
{
status = MQTTSendFailed;
}

MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
} while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) );
}

return status;
}

static void handleCleanSession( MQTTContext_t * pContext )
static MQTTStatus_t handleCleanSession( MQTTContext_t * pContext )
{
MQTTStatus_t status = MQTTSuccess;
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;

assert( pContext != NULL );

/* Reset the index and clear the buffer when a new session is established. */
pContext->index = 0;
( void ) memset( pContext->networkBuffer.pBuffer, 0, pContext->networkBuffer.size );

if( pContext->clearFunction != NULL )
{
cursor = MQTT_STATE_CURSOR_INITIALIZER;

/* Resend all the PUBLISH for which PUBACK/PUBREC is not received
* after session is reestablished. */
do
{
packetId = MQTT_PublishToResend( pContext, &cursor );

if( packetId != MQTT_PACKET_ID_INVALID )
{
pContext->clearFunction( pContext, packetId );
}
} while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) );
}

if( pContext->outgoingPublishRecordMaxCount > 0U )
{
/* Clear any existing records if a new session is established. */
Expand All @@ -2517,6 +2626,8 @@ static void handleCleanSession( MQTTContext_t * pContext )
0x00,
pContext->incomingPublishRecordMaxCount * sizeof( *pContext->incomingPublishRecords ) );
}

return status;
}

static MQTTStatus_t validatePublishParams( const MQTTContext_t * pContext,
Expand Down Expand Up @@ -2681,6 +2792,46 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,

/*-----------------------------------------------------------*/

MQTTStatus_t MQTT_InitRetransmits( MQTTContext_t * pContext,
MQTTStorePacketForRetransmit storeFunction,
MQTTRetrievePacketForRetransmit retrieveFunction,
MQTTClearPacketForRetransmit clearFunction )
{
MQTTStatus_t status = MQTTSuccess;

if( pContext == NULL )
{
LogError( ( "Argument cannot be NULL: pContext=%p\n",
( void * ) pContext ) );
status = MQTTBadParameter;
}
else if( storeFunction == NULL )
{
LogError( ( "Invalid parameter: storeFunction is NULL" ) );
status = MQTTBadParameter;
}
else if( retrieveFunction == NULL )
{
LogError( ( "Invalid parameter: retrieveFunction is NULL" ) );
status = MQTTBadParameter;
}
else if( clearFunction == NULL )
{
LogError( ( "Invalid parameter: clearFunction is NULL" ) );
status = MQTTBadParameter;
}
else
{
pContext->storeFunction = storeFunction;
pContext->retrieveFunction = retrieveFunction;
pContext->clearFunction = clearFunction;
}

return status;
}

/*-----------------------------------------------------------*/

MQTTStatus_t MQTT_CancelCallback( const MQTTContext_t * pContext,
uint16_t packetId )
{
Expand Down Expand Up @@ -2820,7 +2971,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,

if( ( status == MQTTSuccess ) && ( *pSessionPresent != true ) )
{
handleCleanSession( pContext );
status = handleCleanSession( pContext );
}

if( status == MQTTSuccess )
Expand All @@ -2837,7 +2988,7 @@ MQTTStatus_t MQTT_Connect( MQTTContext_t * pContext,

if( ( status == MQTTSuccess ) && ( *pSessionPresent == true ) )
{
/* Resend PUBRELs when reestablishing a session */
/* Resend PUBRELs and PUBLISHES when reestablishing a session */
status = handleUncleanSessionResumption( pContext );
}

Expand Down Expand Up @@ -3560,6 +3711,14 @@ const char * MQTT_Status_strerror( MQTTStatus_t status )
str = "MQTTStatusDisconnectPending";
break;

case MQTTPublishStoreFailed:
str = "MQTTPublishStoreFailed";
break;

case MQTTPublishRetrieveFailed:
str = "MQTTPublishRetrieveFailed";
break;

default:
str = "Invalid MQTT Status code";
break;
Expand All @@ -3569,3 +3728,37 @@ const char * MQTT_Status_strerror( MQTTStatus_t status )
}

/*-----------------------------------------------------------*/

size_t MQTT_GetBytesInMQTTVec( MQTTVec_t * pVec )
{
size_t memoryRequired = 0;
size_t i;
TransportOutVector_t * pTransportVec = pVec->pVector;
size_t vecLen = pVec->vectorLen;

for( i = 0; i < vecLen; i++ )
{
memoryRequired += pTransportVec[ i ].iov_len;
}

return memoryRequired;
}

/*-----------------------------------------------------------*/

void MQTT_SerializeMQTTVec( uint8_t * pAllocatedMem,
MQTTVec_t * pVec )
{
TransportOutVector_t * pTransportVec = pVec->pVector;
const size_t vecLen = pVec->vectorLen;
size_t index = 0;
size_t i = 0;

for( i = 0; i < vecLen; i++ )
{
memcpy( &pAllocatedMem[ index ], pTransportVec[ i ].iov_base, pTransportVec[ i ].iov_len );
index += pTransportVec[ i ].iov_len;
}
}

/*-----------------------------------------------------------*/
Loading

0 comments on commit f1827d8

Please sign in to comment.