@ -24,26 +24,44 @@
*/
/*
* Demo for showing use of the managed MQTT API shared between multiple tasks .
* This demo uses a thread safe queue to hold commands for interacting with the
* MQTT API . There are four tasks to note in this demo :
* - A command ( main ) task for processing commands from the command queue while
* other tasks enqueue them . This task enters a loop , during which it processes
* commands from the command queue . If a termination command is received , it
* will break from the loop .
* - A publisher task for synchronous publishes . This task creates a series of
* publish operations to push to the command queue , which are then executed
* by the command task . This task uses synchronous publishes , meaning it will
* wait for each publish to complete before scheduling the next one .
* - A publisher task for asynchronous publishes . The difference between this
* task and the previous is that it will not wait for completion before
* scheduling the next publish , and checks them after all publishes have been
* enqueued . Note that the distinction between synchronous and asynchronous
* publishes is only in the behavior of the task , not in the actual publish
* command .
* - A subscriber task that creates an MQTT subscription to a topic filter
* matching the topics published on by the publishers . It loops while waiting
* for publish messages to be received .
* This demo shows how to use coreMQTT in a multithreaded environment - it does not
* yet go as far as encapsulating the MQTT library within its own agent ( or daemon )
* task - although the prvCommandLoop ( ) function demonstrates how that might be done .
* In this task prvCommandLoop ( ) is only executed from a single thread and is the
* only function that is allowed to use the coreMQTT API directly . Anything else
* needing to interact with the coreMQTT API does so by posting commands to
* prvCommandLoop ( ) via a queue . Future coreMQTT releases will build an agent into
* the library itself , and then encapsulate the queues into the implementation of a
* thread safe coreMQTT API .
*
* To use this demo with TLS set democonfigUSE_TLS to 1. To use this demo without
* TLS ( so plain text ) set democonfigUSE_TLS to 0. democonfigUSE_TLS is defined
* in demo_config . h .
*
* ! ! ! Plain text connections are only used for ease of demonstration . Do not send
* ! ! ! sensitive data on unencrypted connections . Production devices should used
* ! ! ! mutually authenticated and encrypted connections .
*
* There are four tasks to note in this demo :
* - prvMQTTDemoTask ( ) manages multiple iterations of the demo . Each iteration
* creates the other tasks , calls prvCommandLoop ( ) to handle the MQTT traffic ,
* then cleans up ready for the next iteration .
* - prvSyncPublishTask ( ) which demonstrates synchronous publishes . The task creates
* a series of publish operations that are sent over the command queue to be
* processed by prvCommandLoop ( ) , waiting for each publish to complete before
* sending the next .
* - prvAsyncPublishTask ( ) which demonstrates asynchronous publishes . Like
* prvSyncPublishTask ( ) , the task creates a series of publish operations that are
* sent over the command queue to be processed by prvCommandLoop ( ) , but unlike
* prvSyncPublishTask ( ) this task does not wait for each publish to be complete
* until after all the publish commands are sent . Note that the distinction
* between synchronous and asynchronous publishes is only in the behavior of the
* task , not in the actual publish command .
* - prvSubscribeTask ( ) which creates an MQTT subscription to a topic filter
* matching the topics published on by the two publishing tasks , and in doing so ,
* ensures the demo received a publish command back for each publish command it
* sends . It loops while waiting for publish messages to be received .
*
* Tasks can have queues to hold received publish messages , and the command task
* will push incoming publishes to the queue of each task that is subscribed to
* the incoming topic .
@ -83,70 +101,6 @@
/**
* These configuration settings are required to run the demo .
*/
# ifndef democonfigCLIENT_IDENTIFIER
/**
* @ brief The MQTT client identifier used in this example . Each client identifier
* must be unique so edit as required to ensure no two clients connecting to the
* same broker use the same client identifier .
*
* @ note Appending __TIME__ to the client id string will reduce the possibility of a
* client id collision in the broker . Note that the appended time is the compilation
* time . This client id can cause collision , if more than one instance of the same
* binary is used at the same time to connect to the broker .
*/
# define democonfigCLIENT_IDENTIFIER "testClient"__TIME__
# endif
/* Compile time error for some undefined configs, and provide default values
* for others . */
# ifndef democonfigMQTT_BROKER_ENDPOINT
# error "Please define democonfigMQTT_BROKER_ENDPOINT in demo_config.h."
# endif
# if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
# ifndef democonfigROOT_CA_PEM
# error "Please define Root CA certificate of the MQTT broker(democonfigROOT_CA_PEM) in demo_config.h."
# endif
/* If no username is defined, then a client certificate/key is required. */
# ifndef democonfigCLIENT_USERNAME
/*
* ! ! ! Please note democonfigCLIENT_PRIVATE_KEY_PEM in used for
* ! ! ! convenience of demonstration only . Production devices should
* ! ! ! store keys securely , such as within a secure element .
*/
# ifndef democonfigCLIENT_CERTIFICATE_PEM
# error "Please define client certificate(democonfigCLIENT_CERTIFICATE_PEM) in demo_config.h."
# endif
# ifndef democonfigCLIENT_PRIVATE_KEY_PEM
# error "Please define client private key(democonfigCLIENT_PRIVATE_KEY_PEM) in demo_config.h."
# endif
# else
/* If a username is defined, a client password also would need to be defined for
* client authentication . */
# ifndef democonfigCLIENT_PASSWORD
# error "Please define client password(democonfigCLIENT_PASSWORD) in demo_config.h for client authentication based on username / password."
# endif
/* AWS IoT MQTT broker port needs to be 443 for client authentication based on
* username / password . */
# if defined( democonfigUSE_AWS_IOT_CORE_BROKER ) && democonfigMQTT_BROKER_PORT != 443
# error "Broker port(democonfigMQTT_BROKER_PORT) should be defined as 443 in demo_config.h for client authentication based on username / password in AWS IoT Core."
# endif
# endif /* ifndef democonfigCLIENT_USERNAME */
# ifndef democonfigMQTT_BROKER_PORT
# define democonfigMQTT_BROKER_PORT ( 8883 )
# endif
# else /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */
# ifndef democonfigMQTT_BROKER_PORT
# define democonfigMQTT_BROKER_PORT ( 1883 )
# endif
# endif /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */
/**
* @ brief The size to use for the network buffer .
@ -155,73 +109,6 @@
# define mqttexampleNETWORK_BUFFER_SIZE ( 1024U )
# endif
/**
* @ brief ALPN ( Application - Layer Protocol Negotiation ) protocol name for AWS IoT MQTT .
*
* This will be used if democonfigMQTT_BROKER_PORT is configured as 443 for the AWS IoT MQTT broker .
* Please see more details about the ALPN protocol for AWS IoT MQTT endpoint
* in the link below .
* https : //aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/
*/
# define AWS_IOT_MQTT_ALPN "\x0ex-amzn-mqtt-ca"
/**
* @ brief This is the ALPN ( Application - Layer Protocol Negotiation ) string
* required by AWS IoT for password - based authentication using TCP port 443.
*/
# define AWS_IOT_CUSTOM_AUTH_ALPN "\x04mqtt"
/**
* Provide default values for undefined configuration settings .
*/
# ifndef democonfigOS_NAME
# define democonfigOS_NAME "FreeRTOS"
# endif
# ifndef democonfigOS_VERSION
# define democonfigOS_VERSION tskKERNEL_VERSION_NUMBER
# endif
# ifndef democonfigHARDWARE_PLATFORM_NAME
# define democonfigHARDWARE_PLATFORM_NAME "WinSim"
# endif
# ifndef democonfigMQTT_LIB
# define democonfigMQTT_LIB "core-mqtt@1.0.0"
# endif
/**
* @ brief The MQTT metrics string expected by AWS IoT .
*/
# define AWS_IOT_METRICS_STRING \
" ?SDK= " democonfigOS_NAME " &Version= " democonfigOS_VERSION \
" &Platform= " democonfigHARDWARE_PLATFORM_NAME " &MQTTLib= " democonfigMQTT_LIB
/**
* @ brief The length of the MQTT metrics string expected by AWS IoT .
*/
# define AWS_IOT_METRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( AWS_IOT_METRICS_STRING ) - 1 ) )
# ifdef democonfigCLIENT_USERNAME
/**
* @ brief Append the username with the metrics string if # democonfigCLIENT_USERNAME is defined .
*
* This is to support both metrics reporting and username / password based client
* authentication by AWS IoT .
*/
# define CLIENT_USERNAME_WITH_METRICS democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING
# endif
/**
* @ brief Length of client identifier .
*/
# define democonfigCLIENT_IDENTIFIER_LENGTH ( ( uint16_t ) ( sizeof( democonfigCLIENT_IDENTIFIER ) - 1 ) )
/**
* @ brief Length of MQTT server host name .
*/
# define democonfigBROKER_ENDPOINT_LENGTH ( ( uint16_t ) ( sizeof( democonfigMQTT_BROKER_ENDPOINT ) - 1 ) )
/**
* @ brief Timeout for receiving CONNACK packet in milliseconds .
@ -532,13 +419,6 @@ static BaseType_t prvSocketDisconnect( NetworkContext_t * pxNetworkContext );
*/
static void prvMQTTClientSocketWakeupCallback ( Socket_t pxSocket ) ;
/**
* @ brief Initialize context for a command .
*
* @ param [ in ] pxContext Context to initialize .
*/
static void prvInitializeCommandContext ( CommandContext_t * pxContext ) ;
/**
* @ brief Track an operation by adding it to a list , indicating it is anticipating
* an acknowledgment .
@ -768,6 +648,14 @@ static void prvMQTTDemoTask( void * pvParameters );
*/
static uint32_t prvGetTimeMs ( void ) ;
/**
* @ brief Cleans any persistent sessions that may already exist
* This demo uses a persistent session that can be re - connected if disconnected .
* Clean any lingering sessions that may exist from previous executions of the
* demo .
*/
static void prvCleanExistingPersistentSession ( void ) ;
/*-----------------------------------------------------------*/
/**
@ -1049,7 +937,7 @@ static MQTTStatus_t prvResumeSession( bool xSessionPresent )
/* Resubscribe if needed. */
if ( j > 0 )
{
prvInitializeCommandContext( & xResubscribeContext ) ;
memset( ( void * ) & xResubscribeContext , 0x00 , sizeof ( xResubscribeContext ) ) ;
xResubscribeContext . pxSubscribeInfo = pxResendSubscriptions ;
xResubscribeContext . ulSubscriptionCount = j ;
/* Set to NULL so existing queues will not be overwritten. */
@ -1221,18 +1109,6 @@ static void prvMQTTClientSocketWakeupCallback( Socket_t pxSocket )
/*-----------------------------------------------------------*/
static void prvInitializeCommandContext ( CommandContext_t * pxContext )
{
pxContext - > xIsComplete = false ;
pxContext - > pxResponseQueue = NULL ;
pxContext - > xReturnStatus = MQTTSuccess ;
pxContext - > pxPublishInfo = NULL ;
pxContext - > pxSubscribeInfo = NULL ;
pxContext - > ulSubscriptionCount = 0 ;
}
/*-----------------------------------------------------------*/
static bool prvAddAwaitingOperation ( uint16_t usPacketId ,
Command_t * pxCommand )
{
@ -1742,7 +1618,6 @@ static void prvCommandLoop( void )
{
Command_t xCommand ;
Command_t xNewCommand ;
Command_t * pxCommand ;
MQTTStatus_t xStatus = MQTTSuccess ;
static int lNumProcessed = 0 ;
bool xTerminateReceived = false ;
@ -1758,9 +1633,7 @@ static void prvCommandLoop( void )
continue ;
}
pxCommand = & xCommand ;
xStatus = prvProcessCommand ( pxCommand ) ;
xStatus = prvProcessCommand ( & xCommand ) ;
/* Add connect operation to front of queue if status was not successful. */
if ( xStatus ! = MQTTSuccess )
@ -1778,15 +1651,17 @@ static void prvCommandLoop( void )
/* Delay after sending a subscribe. This is to so that the broker
* creates a subscription for us before processing our next publish ,
* which should be immediately after this . */
if ( pxCommand - > xCommandType = = SUBSCRIBE )
* which should be immediately after this . Only required because the
* subscribe and publish commands are coming from separate tasks , which
* would not normally be the case . */
if ( xCommand . xCommandType = = SUBSCRIBE )
{
LogDebug ( ( " Sleeping for %d ms after sending SUBSCRIBE packet. " , mqttexampleSUBSCRIBE_TASK_DELAY_MS ) ) ;
vTaskDelay ( mqttexampleSUBSCRIBE_TASK_DELAY_MS ) ;
}
/* Terminate the loop if we receive the termination command. */
if ( pxCommand- > xCommandType = = TERMINATE )
if ( xCommand. xCommandType = = TERMINATE )
{
xTerminateReceived = true ;
break ;
@ -1800,8 +1675,7 @@ static void prvCommandLoop( void )
configASSERT ( xTerminateReceived = = true ) ;
LogInfo ( ( " Creating Disconnect operation. " ) ) ;
prvCreateCommand ( DISCONNECT , NULL , NULL , & xNewCommand ) ;
prvProcessCommand ( & xNewCommand ) ;
MQTT_Disconnect ( & globalMqttContext ) ;
LogInfo ( ( " Disconnected from broker. " ) ) ;
}
@ -1873,7 +1747,7 @@ void prvSyncPublishTask( void * pvParameters )
snprintf ( topicBuf , mqttexampleDEMO_BUFFER_SIZE , mqttexamplePUBLISH_TOPIC_FORMAT_STRING , " sync " , i + 1 ) ;
xPublishInfo . topicNameLength = ( uint16_t ) strlen ( topicBuf ) ;
prvInitializeCommandContext( & xContext ) ;
memset( ( void * ) & xContext , 0x00 , sizeof ( xContext ) ) ;
xContext . xTaskToNotify = xTaskGetCurrentTaskHandle ( ) ;
xContext . ulNotificationBit = 1 < < i ;
xContext . pxPublishInfo = & xPublishInfo ;
@ -1935,7 +1809,7 @@ void prvAsyncPublishTask( void * pvParameters )
for ( int i = 0 ; i < mqttexamplePUBLISH_COUNT / 2 ; i + + )
{
pxContexts [ i ] = ( CommandContext_t * ) pvPortMalloc ( sizeof ( CommandContext_t ) ) ;
prvInitializeCommandContext ( pxContexts [ i ] ) ;
memset( ( void * ) pxContexts [ i ] , 0x00 , sizeof ( pxContexts [ i ] ) ) ;
pxContexts [ i ] - > xTaskToNotify = xTaskGetCurrentTaskHandle ( ) ;
/* Set the notification bit to be the publish number. This prevents this demo
@ -2023,7 +1897,7 @@ void prvSubscribeTask( void * pvParameters )
LogInfo ( ( " Topic filter: %.*s " , xSubscribeInfo . topicFilterLength , xSubscribeInfo . pTopicFilter ) ) ;
/* Create the context and subscribe command. */
prvInitializeCommandContext( & xContext ) ;
memset( & xContext , 0x00 , sizeof ( xContext ) ) ;
xContext . pxResponseQueue = xSubscriberResponseQueue ;
xContext . xTaskToNotify = xTaskGetCurrentTaskHandle ( ) ;
xContext . ulNotificationBit = mqttexampleSUBSCRIBE_COMPLETE_BIT ;
@ -2097,7 +1971,7 @@ void prvSubscribeTask( void * pvParameters )
LogInfo ( ( " Finished receiving \n " ) ) ;
prvCreateCommand ( UNSUBSCRIBE , & xContext , prvCommandCallback , & xCommand ) ;
prvInitializeCommandContext( & xContext ) ;
memset( ( void * ) & xContext , 0x00 , sizeof ( xContext ) ) ;
xContext . pxResponseQueue = xSubscriberResponseQueue ;
xContext . xTaskToNotify = xTaskGetCurrentTaskHandle ( ) ;
xContext . ulNotificationBit = mqttexampleUNSUBSCRIBE_COMPLETE_BIT ;
@ -2131,12 +2005,38 @@ void prvSubscribeTask( void * pvParameters )
/*-----------------------------------------------------------*/
static void prvCleanExistingPersistentSession ( void )
{
BaseType_t xNetworkStatus = pdFAIL ;
MQTTStatus_t xMQTTStatus ;
/* Connect to the broker. We connect here with the "clean session" flag set
* to true in order to clear any prior state in the broker . We will disconnect
* and later form a persistent session , so that it may be resumed if the
* network suddenly disconnects . */
xNetworkStatus = prvSocketConnect ( & xNetworkContext ) ;
configASSERT ( xNetworkStatus = = pdPASS ) ;
LogInfo ( ( " Creating a clean session to clear any broker state information. " ) ) ;
xMQTTStatus = prvMQTTInit ( & globalMqttContext , & xNetworkContext ) ;
configASSERT ( xMQTTStatus = = MQTTSuccess ) ;
xMQTTStatus = prvMQTTConnect ( & globalMqttContext , true ) ;
configASSERT ( xMQTTStatus = = MQTTSuccess ) ;
/* Disconnect. */
xMQTTStatus = MQTT_Disconnect ( & globalMqttContext ) ;
configASSERT ( xMQTTStatus = = MQTTSuccess ) ;
xNetworkStatus = prvSocketDisconnect ( & xNetworkContext ) ;
configASSERT ( xNetworkStatus = = pdPASS ) ;
}
/*-----------------------------------------------------------*/
static void prvMQTTDemoTask ( void * pvParameters )
{
BaseType_t xNetworkStatus = pdFAIL ;
MQTTStatus_t xMQTTStatus ;
BaseType_t xResult = pdFALSE ;
uint32_t ulNotification = 0 ;
MQTTStatus_t xMQTTStatus ;
uint32_t ulExpectedNotifications = mqttexamplePUBLISHER_SYNC_COMPLETE_BIT |
mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT |
mqttexamplePUBLISHER_ASYNC_COMPLETE_BIT ;
@ -2156,23 +2056,10 @@ static void prvMQTTDemoTask( void * pvParameters )
* synchronization primitives . */
xDefaultResponseQueue = xQueueCreate ( 1 , sizeof ( PublishElement_t ) ) ;
/* Connect to the broker. We connect here with the "clean session" flag set
* to true in order to clear any prior state in the broker . We will disconnect
* and later form a persistent session , so that it may be resumed if the
* network suddenly disconnects . */
xNetworkStatus = prvSocketConnect ( & xNetworkContext ) ;
configASSERT ( xNetworkStatus = = pdPASS ) ;
LogInfo ( ( " Creating a clean session to clear any broker state information. " ) ) ;
xMQTTStatus = prvMQTTInit ( & globalMqttContext , & xNetworkContext ) ;
configASSERT ( xMQTTStatus = = MQTTSuccess ) ;
xMQTTStatus = prvMQTTConnect ( & globalMqttContext , true ) ;
configASSERT ( xMQTTStatus = = MQTTSuccess ) ;
/* Disconnect. */
xMQTTStatus = MQTT_Disconnect ( & globalMqttContext ) ;
configASSERT ( xMQTTStatus = = MQTTSuccess ) ;
xNetworkStatus = prvSocketDisconnect ( & xNetworkContext ) ;
configASSERT ( xNetworkStatus = = pdPASS ) ;
/* This demo uses a persistent session that can be re-connected if disconnected.
* Clean any lingering sessions that may exist from previous executions of the
* demo . */
prvCleanExistingPersistentSession ( ) ;
for ( ; ; )
{
@ -2188,8 +2075,9 @@ static void prvMQTTDemoTask( void * pvParameters )
configASSERT ( xMQTTStatus = = MQTTSuccess ) ;
configASSERT ( globalMqttContext . connectStatus = = MQTTConnected ) ;
/* Give subscriber task higher priority so the subscribe will be processed before the first publish.
* This must be less than or equal to the priority of the main task . */
/* Give subscriber task higher priority so the subscribe will be processed
* before the first publish . This must be less than or equal to the priority of
* the main task . */
xResult = xTaskCreate ( prvSubscribeTask , " Subscriber " , democonfigDEMO_STACKSIZE , NULL , tskIDLE_PRIORITY + 1 , & xSubscribeTask ) ;
configASSERT ( xResult = = pdPASS ) ;
xResult = xTaskCreate ( prvSyncPublishTask , " SyncPublisher " , democonfigDEMO_STACKSIZE , NULL , tskIDLE_PRIORITY , & xSyncPublisherTask ) ;
@ -2218,7 +2106,7 @@ static void prvMQTTDemoTask( void * pvParameters )
xNetworkStatus = prvSocketDisconnect ( & xNetworkContext ) ;
configASSERT ( xNetworkStatus = = pdPASS ) ;
LogInfo ( ( " prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u. \r \n " , xPortGetFreeHeapSize ( ) ) ) ;
LogInfo ( ( " \r \n \r \n prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u. \r \n " , xPortGetFreeHeapSize ( ) ) ) ;
LogInfo ( ( " Demo completed successfully. \r \n " ) ) ;
LogInfo ( ( " Short delay before starting the next iteration.... \r \n \r \n " ) ) ;
vTaskDelay ( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS ) ;