@ -250,6 +250,18 @@
*/
# define mqttexampleUNSUBSCRIBE_COMPLETE_BIT ( 1U << 1 )
/**
* @ brief The maximum number of loop iterations to wait before declaring failure .
*
* Each ` while ` loop waiting for a task notification will wait for a total
* number of ticks equal to ` mqttexampleDEMO_TICKS_TO_WAIT ` * this number of
* iterations before the loop exits .
*
* @ note This value should not be too small , as the reason for a long loop
* may be a loss of network connection .
*/
# define mqttexampleMAX_WAIT_ITERATIONS ( 20 )
/**
* @ brief Topic filter used by the subscriber task .
*/
@ -823,7 +835,7 @@ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext,
{
pxResendSubscriptions [ j ] . pTopicFilter = pxSubscriptions [ i ] . pcSubscriptionFilter ;
pxResendSubscriptions [ j ] . topicFilterLength = pxSubscriptions [ i ] . usFilterLength ;
pxResendSubscriptions [ j ] . qos = MQTTQoS 0 ;
pxResendSubscriptions [ j ] . qos = MQTTQoS 1 ;
j + + ;
}
}
@ -1546,6 +1558,7 @@ void prvPublishTask( void * pvParameters )
char * payloadBuffers [ mqttexamplePUBLISH_COUNT ] ;
char * topicBuffers [ mqttexamplePUBLISH_COUNT ] ;
CommandContext_t * pxContexts [ mqttexamplePUBLISH_COUNT ] = { 0 } ;
uint32_t ulWaitCounter = 0 ;
/* We use QoS 1 so that the operation won't be counted as complete until we
* receive the publish acknowledgment . */
@ -1571,13 +1584,23 @@ void prvPublishTask( void * pvParameters )
xCommandAdded = prvAddCommandToQueue ( & xCommand ) ;
/* Ensure command was added to queue. */
configASSERT ( xCommandAdded = = pdTRUE ) ;
ulWaitCounter = 0 ;
while ( ( ulNotification & ( 1U < < i ) ) ! = ( 1U < < i ) )
{
LogInfo ( ( " Waiting for publish %d to complete. " , i + 1 ) ) ;
xTaskNotifyWait ( 0 , ( 1U < < i ) , & ulNotification , mqttexampleDEMO_TICKS_TO_WAIT ) ;
if ( + + ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError ( ( " Synchronous publish loop iteration %d "
" exceeded maximum wait time. " , ( i + 1 ) ) ) ;
break ;
}
}
configASSERT ( ( ulNotification & ( 1U < < i ) ) = = ( 1U < < i ) ) ;
LogInfo ( ( " Publish operation complete. Sleeping for %d ms. \n " , mqttexamplePUBLISH_DELAY_SYNC_MS ) ) ;
vTaskDelay ( pdMS_TO_TICKS ( mqttexamplePUBLISH_DELAY_SYNC_MS ) ) ;
}
@ -1629,12 +1652,23 @@ void prvPublishTask( void * pvParameters )
continue ;
}
ulWaitCounter = 0 ;
while ( ( ulNotification & ( 1U < < i ) ) ! = ( 1U < < i ) )
{
LogInfo ( ( " Waiting to free publish context %d. " , i + 1 ) ) ;
xTaskNotifyWait ( 0 , ( 1U < < i ) , & ulNotification , mqttexampleDEMO_TICKS_TO_WAIT ) ;
if ( + + ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError ( ( " Loop free iteration %d exceeded maximum "
" wait time. " , ( i + 1 ) ) ) ;
break ;
}
}
configASSERT ( ( ulNotification & ( 1U < i ) ) = = ( 1U < < i ) ) ;
vPortFree ( pxContexts [ i ] ) ;
vPortFree ( topicBuffers [ i ] ) ;
vPortFree ( payloadBuffers [ i ] ) ;
@ -1666,11 +1700,12 @@ void prvSubscribeTask( void * pvParameters )
uint32_t ulNotification = 0 ;
CommandContext_t xContext ;
PublishElement_t xReceivedPublish ;
uint32_t ulWaitCounter = 0 ;
/* The QoS does not affect when subscribe operations are marked completed
* as it does for publishes . Since the QoS does not impact this demo , w e
* will use QoS 0 , as it is the simples t. */
xSubscribeInfo . qos = MQTTQoS 0 ;
* as it does for publishes . However, we still use QoS 1 here so that th e
* broker will resend publishes if there is a network disconnec t. */
xSubscribeInfo . qos = MQTTQoS 1 ;
xSubscribeInfo . pTopicFilter = mqttexampleSUBSCRIBE_TOPIC_FILTER ;
xSubscribeInfo . topicFilterLength = ( uint16_t ) strlen ( xSubscribeInfo . pTopicFilter ) ;
LogInfo ( ( " Topic filter: %.*s " , xSubscribeInfo . topicFilterLength , xSubscribeInfo . pTopicFilter ) ) ;
@ -1689,13 +1724,27 @@ void prvSubscribeTask( void * pvParameters )
/* Ensure command was added to queue. */
configASSERT ( xCommandAdded = = pdTRUE ) ;
/* This demo relies on the server processing our subscription before any publishes.
* Since this demo uses multiple tasks , we do not retry failed subscriptions , as the
* server has likely already processed our first publish , and so this demo will not
* complete successfully . */
while ( ( ulNotification & mqttexampleSUBSCRIBE_COMPLETE_BIT ) ! = mqttexampleSUBSCRIBE_COMPLETE_BIT )
{
LogInfo ( ( " Waiting for subscribe operation to complete. " ) ) ;
xTaskNotifyWait ( 0 , mqttexampleSUBSCRIBE_COMPLETE_BIT , & ulNotification , mqttexampleDEMO_TICKS_TO_WAIT ) ;
if ( + + ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError ( ( " Subscribe Loop exceeded maximum wait time. " ) ) ;
break ;
}
}
configASSERT ( ( ulNotification & mqttexampleSUBSCRIBE_COMPLETE_BIT ) = = mqttexampleSUBSCRIBE_COMPLETE_BIT ) ;
configASSERT ( xContext . xReturnStatus = = MQTTSuccess ) ;
LogInfo ( ( " Operation wait complete. \n " ) ) ;
ulWaitCounter = 0 ;
while ( 1 )
{
@ -1712,6 +1761,8 @@ void prvSubscribeTask( void * pvParameters )
LogInfo ( ( " Received publish on topic %.*s " , pxReceivedPublish - > topicNameLength , pxReceivedPublish - > pTopicName ) ) ;
LogInfo ( ( " Message payload: %.*s \n " , ( int ) pxReceivedPublish - > payloadLength , ( const char * ) pxReceivedPublish - > pPayload ) ) ;
usNumReceived + + ;
/* Reset the wait counter every time a publish is received. */
ulWaitCounter = 0 ;
}
/* Break if all publishes have been received. */
@ -1720,6 +1771,17 @@ void prvSubscribeTask( void * pvParameters )
break ;
}
/* Break if we have been stuck in this loop for too long. The total wait
* here will be ( ( loop delay + queue check delay ) * ` mqttexampleMAX_WAIT_ITERATIONS ` ) .
* For example , with a 1000 ms queue delay , a 400 ms loop delay , and a
* maximum iteration of 20 , this will wait 28 seconds after receiving
* the last publish . */
if ( + + ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError ( ( " Publish receive loop exceeded maximum wait time. " ) ) ;
break ;
}
LogInfo ( ( " No messages queued, received %u publishes, sleeping for %d ms \n " ,
usNumReceived ,
mqttexampleSUBSCRIBE_TASK_DELAY_MS ) ) ;
@ -1739,13 +1801,21 @@ void prvSubscribeTask( void * pvParameters )
/* Ensure command was added to queue. */
configASSERT ( xCommandAdded = = pdTRUE ) ;
LogInfo ( ( " Starting wait on operation \n " ) ) ;
ulWaitCounter = 0 ;
while ( ( ulNotification & mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) ! = mqttexampleUNSUBSCRIBE_COMPLETE_BIT )
{
LogInfo ( ( " Waiting for unsubscribe operation to complete. " ) ) ;
xTaskNotifyWait ( 0 , mqttexampleUNSUBSCRIBE_COMPLETE_BIT , & ulNotification , mqttexampleDEMO_TICKS_TO_WAIT ) ;
if ( + + ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError ( ( " Unsubscribe loop exceeded maximum wait time. " ) ) ;
break ;
}
}
configASSERT ( ( ulNotification & mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) = = mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) ;
LogInfo ( ( " Operation wait complete. \n " ) ) ;
/* Create command to stop command loop. */
@ -1773,6 +1843,7 @@ static void prvMQTTDemoTask( void * pvParameters )
uint32_t ulNotification = 0 ;
Command_t xCommand ;
MQTTStatus_t xMQTTStatus ;
uint32_t ulWaitCounter = 0 ;
( void ) pvParameters ;
@ -1838,6 +1909,7 @@ static void prvMQTTDemoTask( void * pvParameters )
LogInfo ( ( " Running command loop " ) ) ;
prvCommandLoop ( ) ;
ulWaitCounter = 0 ;
/* Delete created tasks and queues.
* Wait for subscriber task to exit before cleaning up . */
@ -1845,15 +1917,28 @@ static void prvMQTTDemoTask( void * pvParameters )
{
LogInfo ( ( " Waiting for subscribe task to exit. " ) ) ;
xTaskNotifyWait ( 0 , mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT , & ulNotification , mqttexampleDEMO_TICKS_TO_WAIT ) ;
if ( + + ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError ( ( " Subscribe task exceeded maximum wait time. " ) ) ;
break ;
}
}
configASSERT ( ( ulNotification & mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ) = = mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ) ;
ulWaitCounter = 0 ;
/* Wait for publishing task to exit before cleaning up. */
while ( ( ulNotification & mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) ! = mqttexamplePUBLISHER_TASK_COMPLETE_BIT )
{
LogInfo ( ( " Waiting for publish task to exit. " ) ) ;
xTaskNotifyWait ( 0 , mqttexamplePUBLISHER_TASK_COMPLETE_BIT , & ulNotification , mqttexampleDEMO_TICKS_TO_WAIT ) ;
if ( + + ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS )
{
LogError ( ( " Publish task exceeded maximum wait time. " ) ) ;
break ;
}
}
configASSERT ( ( ulNotification & mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) = = mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) ;