diff --git a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/DemoTasks/MultitaskMQTTExample.c b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/DemoTasks/MultitaskMQTTExample.c index 81620beaa1..05b5ea333a 100644 --- a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/DemoTasks/MultitaskMQTTExample.c +++ b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/DemoTasks/MultitaskMQTTExample.c @@ -26,18 +26,27 @@ /* * Demo for showing use of the managed MQTT API shared between multiple tasks. * This demo uses a thread safe queue to hold commands for interacting with the - * MQTT API. A command task processes commands from the queue while other tasks - * enqueue them. This task enters a loop, during which it processes commands from - * the command queue. If a termination command is received, it will break from - * the loop. In addition to the command task, this demo uses one task for - * publishing messages to the MQTT broker and another for receiving them via - * an MQTT subscription. The publisher task creates a series of publish operations - * to push to the command queue, which are then executed by the command task. - * The subscriber task subscribes to a topic filter matching the topics published - * on by the publisher, and then loops while waiting for publish messages to be - * received. Each task has a queue to hold received publish messages, - * and the command task pushes incoming publishes to the queue of each task - * that is subscribed to the incoming topic. + * MQTT API. There are four tasks to note in this demo: + * - A command (main) task for processing commands from the command queue while + * other tasks enqueue them. This task enters a loop, during which it processes + * commands from the command queue. If a termination command is received, it + * will break from the loop. + * - A publisher task for synchronous publishes. This task creates a series of + * publish operations to push to the command queue, which are then executed + * by the command task. This task uses synchronous publishes, meaning it will + * wait for each publish to complete before scheduling the next one. + * - A publisher task for asynchronous publishes. The difference between this + * task and the previous is that it will not wait for completion before + * scheduling the next publish, and checks them after all publishes have been + * enqueued. Note that the distinction between synchronous and asynchronous + * publishes is only in the behavior of the task, not in the actual publish + * command. + * - A subscriber task that creates an MQTT subscription to a topic filter + * matching the topics published on by the publishers. It loops while waiting + * for publish messages to be received. + * Tasks can have queues to hold received publish messages, and the command task + * will push incoming publishes to the queue of each task that is subscribed to + * the incoming topic. */ /* Standard includes. */ @@ -92,19 +101,36 @@ /* Compile time error for some undefined configs, and provide default values * for others. */ #ifndef democonfigMQTT_BROKER_ENDPOINT - #error "Please define democonfigMQTT_BROKER_ENDPOINT." + #error "Please define democonfigMQTT_BROKER_ENDPOINT in demo_config.h." #endif #if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) #ifndef democonfigROOT_CA_PEM #error "Please define Root CA certificate of the MQTT broker(democonfigROOT_CA_PEM) in demo_config.h." #endif - #ifndef democonfigCLIENT_CERTIFICATE_PEM - #error "Please define client certificate(democonfigCLIENT_CERTIFICATE_PEM) in demo_config.h." - #endif - #ifndef democonfigCLIENT_PRIVATE_KEY_PEM - #error "Please define client private key(democonfigCLIENT_PRIVATE_KEY_PEM) in demo_config.h." - #endif + +/* If no username is defined, then a client certificate/key is required. */ + #ifndef democonfigCLIENT_USERNAME + #ifndef democonfigCLIENT_CERTIFICATE_PEM + #error "Please define client certificate(democonfigCLIENT_CERTIFICATE_PEM) in demo_config.h." + #endif + #ifndef democonfigCLIENT_PRIVATE_KEY_PEM + #error "Please define client private key(democonfigCLIENT_PRIVATE_KEY_PEM) in demo_config.h." + #endif + #else + +/* If a username is defined, a client password also would need to be defined for + * client authentication. */ + #ifndef democonfigCLIENT_PASSWORD + #error "Please define client password(democonfigCLIENT_PASSWORD) in demo_config.h for client authentication based on username/password." + #endif + +/* AWS IoT MQTT broker port needs to be 443 for client authentication based on + * username/password. */ + #if defined( democonfigUSE_AWS_IOT_CORE_BROKER ) && democonfigMQTT_BROKER_PORT != 443 + #error "Broker port(democonfigMQTT_BROKER_PORT) should be defined as 443 in demo_config.h for client authentication based on username/password in AWS IoT Core." + #endif + #endif /* ifndef democonfigCLIENT_USERNAME */ #ifndef democonfigMQTT_BROKER_PORT #define democonfigMQTT_BROKER_PORT ( 8883 ) @@ -122,6 +148,64 @@ #define mqttexampleNETWORK_BUFFER_SIZE ( 1024U ) #endif +/** + * @brief ALPN (Application-Layer Protocol Negotiation) protocol name for AWS IoT MQTT. + * + * This will be used if democonfigMQTT_BROKER_PORT is configured as 443 for the AWS IoT MQTT broker. + * Please see more details about the ALPN protocol for AWS IoT MQTT endpoint + * in the link below. + * https://aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/ + */ +#define AWS_IOT_MQTT_ALPN "\x0ex-amzn-mqtt-ca" + +/** + * @brief This is the ALPN (Application-Layer Protocol Negotiation) string + * required by AWS IoT for password-based authentication using TCP port 443. + */ +#define AWS_IOT_CUSTOM_AUTH_ALPN "\x04mqtt" + +/** + * Provide default values for undefined configuration settings. + */ +#ifndef democonfigOS_NAME + #define democonfigOS_NAME "FreeRTOS" +#endif + +#ifndef democonfigOS_VERSION + #define democonfigOS_VERSION tskKERNEL_VERSION_NUMBER +#endif + +#ifndef democonfigHARDWARE_PLATFORM_NAME + #define democonfigHARDWARE_PLATFORM_NAME "WinSim" +#endif + +#ifndef democonfigMQTT_LIB + #define democonfigMQTT_LIB "core-mqtt@1.0.0" +#endif + +/** + * @brief The MQTT metrics string expected by AWS IoT. + */ +#define AWS_IOT_METRICS_STRING \ + "?SDK=" democonfigOS_NAME "&Version=" democonfigOS_VERSION \ + "&Platform=" democonfigHARDWARE_PLATFORM_NAME "&MQTTLib=" democonfigMQTT_LIB + +/** + * @brief The length of the MQTT metrics string expected by AWS IoT. + */ +#define AWS_IOT_METRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( AWS_IOT_METRICS_STRING ) - 1 ) ) + +#ifdef democonfigCLIENT_USERNAME + +/** + * @brief Append the username with the metrics string if #democonfigCLIENT_USERNAME is defined. + * + * This is to support both metrics reporting and username/password based client + * authentication by AWS IoT. + */ + #define CLIENT_USERNAME_WITH_METRICS democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING +#endif + /** * @brief Length of client identifier. */ @@ -144,8 +228,12 @@ /** * @brief Timeout for MQTT_ProcessLoop function in milliseconds. + * + * This demo uses no delay for the process loop, so each invocation will run + * one iteration, and will only receive a single packet. However, if there is + * no data available on the socket, the entire socket timeout value will elapse. */ -#define mqttexamplePROCESS_LOOP_TIMEOUT_MS ( 200U ) +#define mqttexamplePROCESS_LOOP_TIMEOUT_MS ( 0U ) /** * @brief The maximum time interval in seconds which is allowed to elapse @@ -221,24 +309,29 @@ #define mqttexampleSUBSCRIBE_TASK_DELAY_MS 400U /** - * @brief Delay for the publisher task between synchronous publishes. + * @brief Delay for the synchronous publisher task between publishes. */ -#define mqttexamplePUBLISH_DELAY_SYNC_MS 500U +#define mqttexamplePUBLISH_DELAY_SYNC_MS 100U /** - * @brief Delay for the publisher task between asynchronous publishes. + * @brief Delay for the asynchronous publisher task between publishes. */ -#define mqttexamplePUBLISH_DELAY_ASYNC_MS 50U +#define mqttexamplePUBLISH_DELAY_ASYNC_MS 100U /** * @brief Notification bit indicating completion of publisher task. */ -#define mqttexamplePUBLISHER_TASK_COMPLETE_BIT ( 1U << 1 ) +#define mqttexamplePUBLISHER_SYNC_COMPLETE_BIT ( 1U << 1 ) + +/** + * @brief Notification bit indicating completion of second publisher task. + */ +#define mqttexamplePUBLISHER_ASYNC_COMPLETE_BIT ( 1U << 2 ) /** * @brief Notification bit indicating completion of subscriber task. */ -#define mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ( 1U << 2 ) +#define mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ( 1U << 3 ) /** * @brief Notification bit used by subscriber task for subscribe operation. @@ -265,17 +358,17 @@ /** * @brief Topic filter used by the subscriber task. */ -#define mqttexampleSUBSCRIBE_TOPIC_FILTER "publish/+/filter" +#define mqttexampleSUBSCRIBE_TOPIC_FILTER "filter/+/+" /** * @brief Format string used by the publisher task for topic names. */ -#define mqttexamplePUBLISH_TOPIC_FORMAT_STRING "publish/%i/filter" +#define mqttexamplePUBLISH_TOPIC_FORMAT_STRING "filter/%s/%i" /** * @brief Format string used by the publisher task for payloads. */ -#define mqttexamplePUBLISH_PAYLOAD_FORMAT "Hello World! %d" +#define mqttexamplePUBLISH_PAYLOAD_FORMAT "Hello World! %s: %d" /*-----------------------------------------------------------*/ @@ -371,20 +464,41 @@ typedef struct publishElement /*-----------------------------------------------------------*/ +/** + * @brief Initializes an MQTT context, including transport interface and + * network buffer. + * + * @param[in] pxMQTTContext MQTT Context to initialize. + * @param[in] pxNetworkContext Network context. + * + * @return `MQTTSuccess` if the initialization succeeds, else `MQTTBadParameter`. + */ +static MQTTStatus_t prvMQTTInit( MQTTContext_t * pxMQTTContext, + NetworkContext_t * pxNetworkContext ); + /** * @brief Sends an MQTT Connect packet over the already connected TCP socket. * * @param[in] pxMQTTContext MQTT context pointer. - * @param[in] xNetworkContext Network context. * @param[in] xCleanSession If a clean session should be established. * * @return `MQTTSuccess` if connection succeeds, else appropriate error code * from MQTT_Connect. */ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, - NetworkContext_t * pxNetworkContext, bool xCleanSession ); +/** + * @brief Resume a session by resending publishes if a session is present in + * the broker, or reestablish subscriptions if not. + * + * @param[in] xSessionPresent The session present flag from the broker. + * + * @return `MQTTSuccess` if it succeeds in resending publishes, else an + * appropriate error code from `MQTT_Publish()` + */ +static MQTTStatus_t prvResumeSession( bool xSessionPresent ); + /** * @brief Form a TCP connection to a server. * @@ -392,7 +506,7 @@ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, * * @return `pdPASS` if connection succeeds, else `pdFAIL`. */ -static BaseType_t prvConnectNetwork( NetworkContext_t * pxNetworkContext ); +static BaseType_t prvSocketConnect( NetworkContext_t * pxNetworkContext ); /** * @brief Disconnect a TCP connection. @@ -401,7 +515,15 @@ static BaseType_t prvConnectNetwork( NetworkContext_t * pxNetworkContext ); * * @return `pdPASS` if disconnect succeeds, else `pdFAIL`. */ -static BaseType_t prvDisconnectNetwork( NetworkContext_t * pxNetworkContext ); +static BaseType_t prvSocketDisconnect( NetworkContext_t * pxNetworkContext ); + +/** + * @brief Callback for adding a process loop call to a command queue, when data + * is available on a socket. + * + * @param[in] pxSocket Socket with data, unused. + */ +static void prvMQTTClientSocketWakeupCallback( Socket_t pxSocket ); /** * @brief Initialize context for a command. @@ -552,7 +674,7 @@ static void prvEventCallback( MQTTContext_t * pMqttContext, * function, and will re-add a process loop command every time one is processed. * This demo will exit the loop after receiving an unsubscribe operation. */ -static void prvCommandLoop(); +static void prvCommandLoop( void ); /** * @brief Common callback for commands in this demo. @@ -564,18 +686,46 @@ static void prvCommandLoop(); static void prvCommandCallback( CommandContext_t * pxContext ); /** - * @brief The task used to create various publish operations. + * @brief Wait for a task notification in a loop. + * + * @param[in] pulNotification pointer holding notification value. + * @param[in] ulExpectedBits Bits to wait for. + * @param[in] xClearBits If bits should be cleared. + * + * @return `true` if notification received without exceeding the timeout, + * else `false`. + */ +static bool prvNotificationWaitLoop( uint32_t * pulNotification, + uint32_t ulExpectedBits, + bool xClearBits ); + +/** + * @brief A task used to create publish operations, waiting for each to complete + * before creating the next one. * * This task creates a series of publish operations to push to a command queue, * which are in turn executed serially by the main task. This task demonstrates - * both synchronous execution - waiting for each publish delivery to complete - * before proceeding - and asynchronous, where it is not necessary for the - * publish operation to complete before this task resumes. + * synchronous execution, waiting for each publish delivery to complete before + * proceeding. * * @param[in] pvParameters Parameters as passed at the time of task creation. Not * used in this example. */ -void prvPublishTask( void * pvParameters ); +void prvSyncPublishTask( void * pvParameters ); + +/** + * @brief A task used to create publish operations, without waiting for + * completion between each new publish. + * + * This task creates publish operations asynchronously, meaning it will not + * wait for a publish to complete before scheduling the next one. Note there + * is no difference in the actual publish operation, only in the behavior of + * this task. + * + * @param[in] pvParameters Parameters as passed at the time of task creation. Not + * used in this example. + */ +void prvAsyncPublishTask( void * pvParameters ); /** * @brief The task used to wait for incoming publishes. @@ -618,6 +768,11 @@ static uint32_t prvGetTimeMs( void ); */ static MQTTContext_t globalMqttContext; +/** + * @brief Global Network context. + */ +static NetworkContext_t xNetworkContext; + /** * @brief List of operations that are awaiting an ack from the broker. */ @@ -643,11 +798,6 @@ static CommandContext_t xResubscribeContext; */ static QueueHandle_t xCommandQueue; -/** - * @brief Response queue for prvPublishTask. - */ -static QueueHandle_t xPublisherResponseQueue; - /** * @brief Response queue for prvSubscribeTask. */ @@ -664,9 +814,14 @@ static QueueHandle_t xDefaultResponseQueue; static TaskHandle_t xMainTask; /** - * @brief Handle for prvPublishTask. + * @brief Handle for prvSyncPublishTask. + */ +static TaskHandle_t xSyncPublisherTask; + +/** + * @brief Handle of prvAsyncPublishTask. */ -static TaskHandle_t xPublisherTask; +static TaskHandle_t xAsyncPublisherTask; /** * @brief Handle for prvSubscribeTask. @@ -676,7 +831,7 @@ static TaskHandle_t xSubscribeTask; /** * @brief The network buffer must remain valid for the lifetime of the MQTT context. */ -static uint8_t buffer[ mqttexampleNETWORK_BUFFER_SIZE ]; +static uint8_t pcNetworkBuffer[ mqttexampleNETWORK_BUFFER_SIZE ]; /** * @brief Global entry time into the application to use as a reference timestamp @@ -705,18 +860,14 @@ void vStartSimpleMQTTDemo( void ) } /*-----------------------------------------------------------*/ -static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, - NetworkContext_t * pxNetworkContext, - bool xCleanSession ) +static MQTTStatus_t prvMQTTInit( MQTTContext_t * pxMQTTContext, + NetworkContext_t * pxNetworkContext ) { - MQTTStatus_t xResult; - MQTTConnectInfo_t xConnectInfo; - bool xSessionPresent = false; TransportInterface_t xTransport; MQTTFixedBuffer_t xNetworkBuffer; /* Fill the values for network buffer. */ - xNetworkBuffer.pBuffer = buffer; + xNetworkBuffer.pBuffer = pcNetworkBuffer; xNetworkBuffer.size = mqttexampleNETWORK_BUFFER_SIZE; /* Fill in Transport Interface send and receive function pointers. */ @@ -729,12 +880,16 @@ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, xTransport.recv = Plaintext_FreeRTOS_recv; #endif - if( xCleanSession ) - { - /* Initialize MQTT library. */ - xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xNetworkBuffer ); - configASSERT( xResult == MQTTSuccess ); - } + /* Initialize MQTT library. */ + return MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xNetworkBuffer ); +} + +static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, + bool xCleanSession ) +{ + MQTTStatus_t xResult; + MQTTConnectInfo_t xConnectInfo; + bool xSessionPresent = false; /* Many fields are not used in this demo so start with everything at 0. */ memset( &xConnectInfo, 0x00, sizeof( xConnectInfo ) ); @@ -751,21 +906,59 @@ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER; xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER ); - /* Set MQTT keep-alive period. It is the responsibility of the application to ensure - * that the interval between Control Packets being sent does not exceed the Keep Alive value. - * In the absence of sending any other Control Packets, the Client MUST send a PINGREQ Packet. */ + /* Set MQTT keep-alive period. It is the responsibility of the application + * to ensure that the interval between Control Packets being sent does not + * exceed the Keep Alive value. In the absence of sending any other Control + * Packets, the Client MUST send a PINGREQ Packet. */ xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_INTERVAL_SECONDS; - /* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it - * is passed as NULL. */ + /* Append metrics when connecting to the AWS IoT Core broker. */ + #ifdef democonfigUSE_AWS_IOT_CORE_BROKER + #ifdef democonfigCLIENT_USERNAME + xConnectInfo.pUserName = CLIENT_USERNAME_WITH_METRICS; + xConnectInfo.userNameLength = ( uint16_t ) strlen( CLIENT_USERNAME_WITH_METRICS ); + xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; + xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); + #else + xConnectInfo.pUserName = AWS_IOT_METRICS_STRING; + xConnectInfo.userNameLength = AWS_IOT_METRICS_STRING_LENGTH; + /* Password for authentication is not used. */ + xConnectInfo.pPassword = NULL; + xConnectInfo.passwordLength = 0U; + #endif + #else /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ + #ifdef democonfigCLIENT_USERNAME + xConnectInfo.pUserName = democonfigCLIENT_USERNAME; + xConnectInfo.userNameLength = ( uint16_t ) strlen( democonfigCLIENT_USERNAME ); + xConnectInfo.pPassword = democonfigCLIENT_PASSWORD; + xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD ); + #endif /* ifdef democonfigCLIENT_USERNAME */ + #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ + + /* Send MQTT CONNECT packet to broker. MQTT's Last Will and Testament feature + * is not used in this demo, so it is passed as NULL. */ xResult = MQTT_Connect( pxMQTTContext, &xConnectInfo, NULL, mqttexampleCONNACK_RECV_TIMEOUT_MS, &xSessionPresent ); - LogInfo( ( "Session present: %d", xSessionPresent ) ); - configASSERT( xResult == MQTTSuccess ); + LogInfo( ( "Session present: %d\n", xSessionPresent ) ); + + /* Resume a session if desired. */ + if( ( xResult == MQTTSuccess ) && !xCleanSession ) + { + xResult = prvResumeSession( xSessionPresent ); + } + + return xResult; +} + +/*-----------------------------------------------------------*/ + +static MQTTStatus_t prvResumeSession( bool xSessionPresent ) +{ + MQTTStatus_t xResult = MQTTSuccess; /* Resend publishes if session is present. NOTE: It's possible that some * of the operations that were in progress during the network interruption @@ -792,6 +985,12 @@ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, /* Set the DUP flag. */ xFoundAck.xOriginalCommand.pxCmdContext->pxPublishInfo->dup = true; xResult = MQTT_Publish( &globalMqttContext, xFoundAck.xOriginalCommand.pxCmdContext->pxPublishInfo, packetId ); + + if( xResult != MQTTSuccess ) + { + LogError( ( "Error in resending publishes. Error code=%s\n", MQTT_Status_strerror( xResult ) ) ); + break; + } } packetId = MQTT_PublishToResend( &globalMqttContext, &cursor ); @@ -802,7 +1001,7 @@ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, * should mark all in progress operations as errors so that the tasks that * created them can try again. Also, we will resubscribe to the filters in * the subscription list, so tasks do not unexpectedly lose their subscriptions. */ - if( !xCleanSession && !xSessionPresent ) + else { int32_t i = 0, j = 0; Command_t xNewCommand; @@ -862,9 +1061,9 @@ static MQTTStatus_t prvMQTTConnect( MQTTContext_t * pxMQTTContext, /*-----------------------------------------------------------*/ -static BaseType_t prvConnectNetwork( NetworkContext_t * pxNetworkContext ) +static BaseType_t prvSocketConnect( NetworkContext_t * pxNetworkContext ) { - bool xConnected = false; + BaseType_t xConnected = pdFAIL; RetryUtilsStatus_t xRetryUtilsStatus = RetryUtilsSuccess; RetryUtilsParams_t xReconnectParams; @@ -872,19 +1071,38 @@ static BaseType_t prvConnectNetwork( NetworkContext_t * pxNetworkContext ) TlsTransportStatus_t xNetworkStatus = TLS_TRANSPORT_CONNECT_FAILURE; NetworkCredentials_t xNetworkCredentials = { 0 }; + #ifdef democonfigUSE_AWS_IOT_CORE_BROKER + + /* ALPN protocols must be a NULL-terminated list of strings. Therefore, + * the first entry will contain the actual ALPN protocol string while the + * second entry must remain NULL. */ + char * pcAlpnProtocols[] = { NULL, NULL }; + + /* The ALPN string changes depending on whether username/password authentication is used. */ + #ifdef democonfigCLIENT_USERNAME + pcAlpnProtocols[ 0 ] = AWS_IOT_CUSTOM_AUTH_ALPN; + #else + pcAlpnProtocols[ 0 ] = AWS_IOT_MQTT_ALPN; + #endif + xNetworkCredentials.pAlpnProtos = pcAlpnProtocols; + #endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */ + /* Set the credentials for establishing a TLS connection. */ xNetworkCredentials.pRootCa = ( const unsigned char * ) democonfigROOT_CA_PEM; xNetworkCredentials.rootCaSize = sizeof( democonfigROOT_CA_PEM ); - xNetworkCredentials.pClientCert = ( const unsigned char * ) democonfigCLIENT_CERTIFICATE_PEM; - xNetworkCredentials.clientCertSize = sizeof( democonfigCLIENT_CERTIFICATE_PEM ); - xNetworkCredentials.pPrivateKey = ( const unsigned char * ) democonfigCLIENT_PRIVATE_KEY_PEM; - xNetworkCredentials.privateKeySize = sizeof( democonfigCLIENT_PRIVATE_KEY_PEM ); + #ifdef democonfigCLIENT_CERTIFICATE_PEM + xNetworkCredentials.pClientCert = ( const unsigned char * ) democonfigCLIENT_CERTIFICATE_PEM; + xNetworkCredentials.clientCertSize = sizeof( democonfigCLIENT_CERTIFICATE_PEM ); + xNetworkCredentials.pPrivateKey = ( const unsigned char * ) democonfigCLIENT_PRIVATE_KEY_PEM; + xNetworkCredentials.privateKeySize = sizeof( democonfigCLIENT_PRIVATE_KEY_PEM ); + #endif xNetworkCredentials.disableSni = democonfigDISABLE_SNI; #else /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */ PlaintextTransportStatus_t xNetworkStatus = PLAINTEXT_TRANSPORT_CONNECT_FAILURE; #endif /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */ - /* Initialize reconnect attempts and interval. */ + /* We will use a retry mechanism with an exponential backoff mechanism and + * jitter. We initialize reconnect attempts and interval here. */ xReconnectParams.maxRetryAttempts = MAX_RETRY_ATTEMPTS; RetryUtils_ParamsReset( &xReconnectParams ); @@ -897,25 +1115,27 @@ static BaseType_t prvConnectNetwork( NetworkContext_t * pxNetworkContext ) /* Establish a TCP connection with the MQTT broker. This example connects to * the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and * democonfigMQTT_BROKER_PORT at the top of this file. */ - LogInfo( ( "Create a TCP connection to %s:%d.", - democonfigMQTT_BROKER_ENDPOINT, - democonfigMQTT_BROKER_PORT ) ); - #if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) + LogInfo( ( "Creating a TLS connection to %s:%d.", + democonfigMQTT_BROKER_ENDPOINT, + democonfigMQTT_BROKER_PORT ) ); xNetworkStatus = TLS_FreeRTOS_Connect( pxNetworkContext, democonfigMQTT_BROKER_ENDPOINT, democonfigMQTT_BROKER_PORT, &xNetworkCredentials, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ); - xConnected = ( xNetworkStatus == TLS_TRANSPORT_SUCCESS ); - #else + xConnected = ( xNetworkStatus == TLS_TRANSPORT_SUCCESS ) ? pdPASS : pdFAIL; + #else /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */ + LogInfo( ( "Creating a TCP connection to %s:%d.", + democonfigMQTT_BROKER_ENDPOINT, + democonfigMQTT_BROKER_PORT ) ); xNetworkStatus = Plaintext_FreeRTOS_Connect( pxNetworkContext, democonfigMQTT_BROKER_ENDPOINT, democonfigMQTT_BROKER_PORT, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS, mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ); - xConnected = ( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS ); + xConnected = ( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS ) ? pdPASS : pdFAIL; #endif /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */ if( !xConnected ) @@ -928,21 +1148,40 @@ static BaseType_t prvConnectNetwork( NetworkContext_t * pxNetworkContext ) { LogError( ( "Connection to the broker failed. All attempts exhausted." ) ); } - } while( ( xConnected != true ) && ( xRetryUtilsStatus == RetryUtilsSuccess ) ); + } while( ( xConnected != pdPASS ) && ( xRetryUtilsStatus == RetryUtilsSuccess ) ); + + /* Set the socket wakeup callback. */ + if( xConnected ) + { + ( void ) FreeRTOS_setsockopt( pxNetworkContext->tcpSocket, + 0, /* Level - Unused. */ + FREERTOS_SO_WAKEUP_CALLBACK, + ( void * ) prvMQTTClientSocketWakeupCallback, + sizeof( &( prvMQTTClientSocketWakeupCallback ) ) ); + } - return ( xConnected ) ? pdPASS : pdFAIL; + return xConnected; } /*-----------------------------------------------------------*/ -static BaseType_t prvDisconnectNetwork( NetworkContext_t * pxNetworkContext ) +static BaseType_t prvSocketDisconnect( NetworkContext_t * pxNetworkContext ) { BaseType_t xDisconnected = pdFAIL; + /* Set the wakeup callback to NULL since the socket will disconnect. */ + ( void ) FreeRTOS_setsockopt( pxNetworkContext->tcpSocket, + 0, /* Level - Unused. */ + FREERTOS_SO_WAKEUP_CALLBACK, + ( void * ) NULL, + sizeof( void * ) ); + #if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) + LogInfo( ( "Disconnecting TLS connection.\n" ) ); TLS_FreeRTOS_Disconnect( pxNetworkContext ); xDisconnected = pdPASS; #else + LogInfo( ( "Disconnecting TCP connection.\n" ) ); PlaintextTransportStatus_t xNetworkStatus = PLAINTEXT_TRANSPORT_CONNECT_FAILURE; xNetworkStatus = Plaintext_FreeRTOS_Disconnect( pxNetworkContext ); xDisconnected = ( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS ) ? pdPASS : pdFAIL; @@ -952,6 +1191,29 @@ static BaseType_t prvDisconnectNetwork( NetworkContext_t * pxNetworkContext ) /*-----------------------------------------------------------*/ +static void prvMQTTClientSocketWakeupCallback( Socket_t pxSocket ) +{ + BaseType_t xResult; + Command_t xCommand; + + /* Just to avoid compiler warnings. The socket is not used but the function + * prototype cannot be changed because this is a callback function. */ + ( void ) pxSocket; + + configASSERT( xCommandQueue ); + + /* A socket used by the MQTT task may need attention. Send an event + * to the MQTT task to make sure the task is not blocked on xCommandQueue. */ + if( uxQueueMessagesWaiting( xCommandQueue ) == ( UBaseType_t ) 0 ) + { + prvCreateCommand( PROCESSLOOP, NULL, NULL, &xCommand ); + xResult = prvAddCommandToQueue( &xCommand ); + configASSERT( xResult == pdTRUE ); + } +} + +/*-----------------------------------------------------------*/ + static void prvInitializeCommandContext( CommandContext_t * pxContext ) { pxContext->xIsComplete = false; @@ -1010,7 +1272,7 @@ static AckInfo_t prvGetAwaitingOperation( uint16_t usPacketId, if( xFoundAck.usPacketId == MQTT_PACKET_ID_INVALID ) { - LogError( ( "No ack found for packet id %u.", usPacketId ) ); + LogError( ( "No ack found for packet id %u.\n", usPacketId ) ); } return xFoundAck; @@ -1037,7 +1299,7 @@ static void prvAddSubscription( const char * pcTopicFilter, /* If a subscription already exists, don't do anything. */ if( pxSubscriptions[ i ].pxResponseQueue == pxQueue ) { - LogWarn( ( "Subscription already exists." ) ); + LogWarn( ( "Subscription already exists.\n" ) ); ulAvailableIndex = mqttexampleSUBSCRIPTIONS_MAX_COUNT; break; } @@ -1152,8 +1414,10 @@ static MQTTStatus_t prvProcessCommand( Command_t * pxCommand ) switch( pxCommand->xCommandType ) { case PROCESSLOOP: + + /* The process loop will run at the end of every command, so we don't + * need to call it again here. */ LogDebug( ( "Running Process Loop." ) ); - xStatus = MQTT_ProcessLoop( &globalMqttContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); break; case PUBLISH: @@ -1226,12 +1490,12 @@ static MQTTStatus_t prvProcessCommand( Command_t * pxCommand ) case RECONNECT: /* Reconnect TCP. */ - xNetworkResult = prvDisconnectNetwork( globalMqttContext.transportInterface.pNetworkContext ); + xNetworkResult = prvSocketDisconnect( &xNetworkContext ); configASSERT( xNetworkResult == pdPASS ); - xNetworkResult = prvConnectNetwork( globalMqttContext.transportInterface.pNetworkContext ); + xNetworkResult = prvSocketConnect( &xNetworkContext ); configASSERT( xNetworkResult == pdPASS ); /* MQTT Connect with a persistent session. */ - xStatus = prvMQTTConnect( &globalMqttContext, globalMqttContext.transportInterface.pNetworkContext, false ); + xStatus = prvMQTTConnect( &globalMqttContext, false ); break; case TERMINATE: @@ -1249,7 +1513,7 @@ static MQTTStatus_t prvProcessCommand( Command_t * pxCommand ) * information. */ if( !xAckAdded ) { - LogError( ( "No memory to wait for acknowledgment for packet %u", usPacketId ) ); + LogError( ( "No memory to wait for acknowledgment for packet %u\n", usPacketId ) ); /* All operations that can wait for acks (publish, subscribe, unsubscribe) * require a context. */ @@ -1267,6 +1531,13 @@ static MQTTStatus_t prvProcessCommand( Command_t * pxCommand ) } } + /* Run a single iteration of the process loop if there were no errors and + * the MQTT connection still exists. */ + if( ( xStatus == MQTTSuccess ) && ( globalMqttContext.connectStatus == MQTTConnected ) ) + { + xStatus = MQTT_ProcessLoop( &globalMqttContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS ); + } + return xStatus; } @@ -1295,7 +1566,7 @@ static void prvHandleIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) if( xIsMatched ) { - LogDebug( ( "Adding publish to response queue for %.*s", + LogDebug( ( "Adding publish to response queue for %.*s\n", pxSubscriptions[ i ].usFilterLength, pxSubscriptions[ i ].pcSubscriptionFilter ) ); xPublishCopied = prvCopyPublishToQueue( pxPublishInfo, pxSubscriptions[ i ].pxResponseQueue ); @@ -1312,7 +1583,7 @@ static void prvHandleIncomingPublish( MQTTPublishInfo_t * pxPublishInfo ) * receive these publishes. */ if( !xRelayedPublish ) { - LogWarn( ( "Publish received on topic %.*s with no subscription.", + LogWarn( ( "Publish received on topic %.*s with no subscription.\n", pxPublishInfo->topicNameLength, pxPublishInfo->pTopicName ) ); xPublishCopied = prvCopyPublishToQueue( pxPublishInfo, xDefaultResponseQueue ); @@ -1356,7 +1627,7 @@ static void prvHandleSubscriptionAcks( MQTTPacketInfo_t * pxPacketInfo, } else { - LogError( ( "Subscription to %.*s failed.", + LogError( ( "Subscription to %.*s failed.\n", pxSubscribeInfo[ i ].topicFilterLength, pxSubscribeInfo[ i ].pTopicFilter ) ); } @@ -1432,7 +1703,7 @@ static void prvEventCallback( MQTTContext_t * pMqttContext, } else { - LogError( ( "No subscription or unsubscribe operation found matching packet id %u.", packetIdentifier ) ); + LogError( ( "No subscription or unsubscribe operation found matching packet id %u.\n", packetIdentifier ) ); } break; @@ -1447,12 +1718,12 @@ static void prvEventCallback( MQTTContext_t * pMqttContext, /* Nothing to be done from application as library handles * PINGRESP. */ LogWarn( ( "PINGRESP should not be handled by the application " - "callback when using MQTT_ProcessLoop.\n\n" ) ); + "callback when using MQTT_ProcessLoop.\n" ) ); break; /* Any other packet type is invalid. */ default: - LogError( ( "Unknown packet type received:(%02x).\n\n", + LogError( ( "Unknown packet type received:(%02x).\n", pPacketInfo->type ) ); } } @@ -1460,7 +1731,7 @@ static void prvEventCallback( MQTTContext_t * pMqttContext, /*-----------------------------------------------------------*/ -static void prvCommandLoop() +static void prvCommandLoop( void ) { Command_t xCommand; Command_t xNewCommand; @@ -1470,10 +1741,16 @@ static void prvCommandLoop() bool xTerminateReceived = false; BaseType_t xCommandAdded = pdTRUE; - /* Loop while the queue is not empty. If a process loop command exists in the - * queue, then it should never become empty as it will be re-added. */ - while( xQueueReceive( xCommandQueue, &xCommand, mqttexampleDEMO_TICKS_TO_WAIT ) != pdFALSE ) + /* Loop until we receive a terminate command. */ + for( ; ; ) { + /* If there is no command in the queue, try again. */ + if( xQueueReceive( xCommandQueue, &xCommand, mqttexampleDEMO_TICKS_TO_WAIT ) == pdFALSE ) + { + LogInfo( ( "No commands in the queue. Trying again." ) ); + continue; + } + pxCommand = &xCommand; xStatus = prvProcessCommand( pxCommand ); @@ -1481,7 +1758,7 @@ static void prvCommandLoop() /* Add connect operation to front of queue if status was not successful. */ if( xStatus != MQTTSuccess ) { - LogError( ( "MQTT operation failed with status %s", + LogError( ( "MQTT operation failed with status %s\n", MQTT_Status_strerror( xStatus ) ) ); prvCreateCommand( RECONNECT, NULL, NULL, &xNewCommand ); xCommandAdded = xQueueSendToFront( xCommandQueue, &xNewCommand, mqttexampleDEMO_TICKS_TO_WAIT ); @@ -1489,18 +1766,9 @@ static void prvCommandLoop() configASSERT( xCommandAdded == pdTRUE ); } + /* Keep a count of processed operations, for debug logs. */ lNumProcessed++; - if( pxCommand->xCommandType == PROCESSLOOP ) - { - /* Add process loop back to end of queue. */ - prvCreateCommand( PROCESSLOOP, NULL, NULL, &xNewCommand ); - xCommandAdded = prvAddCommandToQueue( &xNewCommand ); - /* Ensure the command was re-added. */ - configASSERT( xCommandAdded == pdTRUE ); - lNumProcessed--; - } - /* Delay after sending a subscribe. This is to so that the broker * creates a subscription for us before processing our next publish, * which should be immediately after this. */ @@ -1517,7 +1785,7 @@ static void prvCommandLoop() break; } - LogDebug( ( "Processed %d non-Process Loop operations.", lNumProcessed ) ); + LogDebug( ( "Processed %d operations.", lNumProcessed ) ); } /* Make sure we exited the loop due to receiving a terminate command and not @@ -1544,22 +1812,45 @@ static void prvCommandCallback( CommandContext_t * pxContext ) /*-----------------------------------------------------------*/ -void prvPublishTask( void * pvParameters ) +static bool prvNotificationWaitLoop( uint32_t * pulNotification, + uint32_t ulExpectedBits, + bool xClearBits ) +{ + uint32_t ulWaitCounter = 0U; + bool ret = true; + + configASSERT( pulNotification != NULL ); + + while( ( *pulNotification & ulExpectedBits ) != ulExpectedBits ) + { + xTaskNotifyWait( 0, + ( xClearBits ) ? ulExpectedBits : 0, + pulNotification, + mqttexampleDEMO_TICKS_TO_WAIT ); + + if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) + { + LogError( ( "Loop exceeded maximum wait time.\n" ) ); + ret = false; + break; + } + } + + return ret; +} + +/*-----------------------------------------------------------*/ + +void prvSyncPublishTask( void * pvParameters ) { ( void ) pvParameters; Command_t xCommand; MQTTPublishInfo_t xPublishInfo = { 0 }; - MQTTPublishInfo_t pxPublishes[ mqttexamplePUBLISH_COUNT ]; char payloadBuf[ mqttexampleDEMO_BUFFER_SIZE ]; char topicBuf[ mqttexampleDEMO_BUFFER_SIZE ]; CommandContext_t xContext; uint32_t ulNotification = 0U; BaseType_t xCommandAdded = pdTRUE; - /* The following arrays are used to hold pointers to dynamically allocated memory. */ - char * payloadBuffers[ mqttexamplePUBLISH_COUNT ]; - char * topicBuffers[ mqttexamplePUBLISH_COUNT ]; - CommandContext_t * pxContexts[ mqttexamplePUBLISH_COUNT ] = { 0 }; - uint32_t ulWaitCounter = 0; /* We use QoS 1 so that the operation won't be counted as complete until we * receive the publish acknowledgment. */ @@ -1567,16 +1858,15 @@ void prvPublishTask( void * pvParameters ) xPublishInfo.pTopicName = topicBuf; xPublishInfo.pPayload = payloadBuf; - /* Do synchronous publishes for first half. */ - for( int i = 0; i < mqttexamplePUBLISH_COUNT / 2; i++ ) + /* Synchronous publishes. In case mqttexamplePUBLISH_COUNT is odd, round up. */ + for( int i = 0; i < ( ( mqttexamplePUBLISH_COUNT + 1 ) / 2 ); i++ ) { - snprintf( payloadBuf, mqttexampleDEMO_BUFFER_SIZE, mqttexamplePUBLISH_PAYLOAD_FORMAT, i + 1 ); + snprintf( payloadBuf, mqttexampleDEMO_BUFFER_SIZE, mqttexamplePUBLISH_PAYLOAD_FORMAT, "Sync", i + 1 ); xPublishInfo.payloadLength = ( uint16_t ) strlen( payloadBuf ); - snprintf( topicBuf, mqttexampleDEMO_BUFFER_SIZE, mqttexamplePUBLISH_TOPIC_FORMAT_STRING, i + 1 ); + snprintf( topicBuf, mqttexampleDEMO_BUFFER_SIZE, mqttexamplePUBLISH_TOPIC_FORMAT_STRING, "sync", i + 1 ); xPublishInfo.topicNameLength = ( uint16_t ) strlen( topicBuf ); prvInitializeCommandContext( &xContext ); - xContext.pxResponseQueue = xPublisherResponseQueue; xContext.xTaskToNotify = xTaskGetCurrentTaskHandle(); xContext.ulNotificationBit = 1 << i; xContext.pxPublishInfo = &xPublishInfo; @@ -1585,19 +1875,12 @@ void prvPublishTask( void * pvParameters ) xCommandAdded = prvAddCommandToQueue( &xCommand ); /* Ensure command was added to queue. */ configASSERT( xCommandAdded == pdTRUE ); - ulWaitCounter = 0; + LogInfo( ( "Waiting for publish %d to complete.", i + 1 ) ); - while( ( ulNotification & ( 1U << i ) ) != ( 1U << i ) ) + if( prvNotificationWaitLoop( &ulNotification, ( 1U << i ), true ) != true ) { - LogInfo( ( "Waiting for publish %d to complete.", i + 1 ) ); - xTaskNotifyWait( 0, ( 1U << i ), &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); - - if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) - { - LogError( ( "Synchronous publish loop iteration %d" - " exceeded maximum wait time.", ( i + 1 ) ) ); - break; - } + LogError( ( "Synchronous publish loop iteration %d" + " exceeded maximum wait time.\n", ( i + 1 ) ) ); } configASSERT( ( ulNotification & ( 1U << i ) ) == ( 1U << i ) ); @@ -1606,23 +1889,57 @@ void prvPublishTask( void * pvParameters ) vTaskDelay( pdMS_TO_TICKS( mqttexamplePUBLISH_DELAY_SYNC_MS ) ); } - /* Asynchronous publishes for second half. Although not necessary, we use dynamic + LogInfo( ( "Finished sync publishes.\n" ) ); + + /* Clear this task's notifications. */ + xTaskNotifyStateClear( NULL ); + ulNotification = ulTaskNotifyValueClear( NULL, ~( 0U ) ); + + /* Notify main task this task has completed. */ + xTaskNotify( xMainTask, mqttexamplePUBLISHER_SYNC_COMPLETE_BIT, eSetBits ); + + /* Delete this task. */ + LogInfo( ( "Deleting Sync Publisher task." ) ); + vTaskDelete( NULL ); +} + +/*-----------------------------------------------------------*/ + +void prvAsyncPublishTask( void * pvParameters ) +{ + ( void ) pvParameters; + Command_t xCommand; + MQTTPublishInfo_t pxPublishes[ mqttexamplePUBLISH_COUNT / 2 ]; + uint32_t ulNotification = 0U; + uint32_t ulExpectedNotifications = 0U; + BaseType_t xCommandAdded = pdTRUE; + /* The following arrays are used to hold pointers to dynamically allocated memory. */ + char * payloadBuffers[ mqttexamplePUBLISH_COUNT / 2 ]; + char * topicBuffers[ mqttexamplePUBLISH_COUNT / 2 ]; + CommandContext_t * pxContexts[ mqttexamplePUBLISH_COUNT / 2 ] = { 0 }; + + /* Add a delay. The main task will not be sending publishes for this interval + * anyway, as we want to give the broker ample time to process the + * subscription. */ + vTaskDelay( mqttexampleSUBSCRIBE_TASK_DELAY_MS ); + + /* Asynchronous publishes. Although not necessary, we use dynamic * memory here to avoid declaring many static buffers. */ - for( int i = mqttexamplePUBLISH_COUNT >> 1; i < mqttexamplePUBLISH_COUNT; i++ ) + for( int i = 0; i < mqttexamplePUBLISH_COUNT / 2; i++ ) { pxContexts[ i ] = ( CommandContext_t * ) pvPortMalloc( sizeof( CommandContext_t ) ); prvInitializeCommandContext( pxContexts[ i ] ); - pxContexts[ i ]->pxResponseQueue = xPublisherResponseQueue; pxContexts[ i ]->xTaskToNotify = xTaskGetCurrentTaskHandle(); /* Set the notification bit to be the publish number. This prevents this demo * from having more than 32 publishes. If many publishes are desired, semaphores * can be used instead of task notifications. */ pxContexts[ i ]->ulNotificationBit = 1U << i; + ulExpectedNotifications |= 1U << i; payloadBuffers[ i ] = ( char * ) pvPortMalloc( mqttexampleDYNAMIC_BUFFER_SIZE ); topicBuffers[ i ] = ( char * ) pvPortMalloc( mqttexampleDYNAMIC_BUFFER_SIZE ); - snprintf( payloadBuffers[ i ], mqttexampleDYNAMIC_BUFFER_SIZE, mqttexamplePUBLISH_PAYLOAD_FORMAT, i + 1 ); - snprintf( topicBuffers[ i ], mqttexampleDYNAMIC_BUFFER_SIZE, mqttexamplePUBLISH_TOPIC_FORMAT_STRING, i + 1 ); + snprintf( payloadBuffers[ i ], mqttexampleDYNAMIC_BUFFER_SIZE, mqttexamplePUBLISH_PAYLOAD_FORMAT, "Async", i + 1 ); + snprintf( topicBuffers[ i ], mqttexampleDYNAMIC_BUFFER_SIZE, mqttexamplePUBLISH_TOPIC_FORMAT_STRING, "async", i + 1 ); /* Set publish info. */ memset( &( pxPublishes[ i ] ), 0x00, sizeof( MQTTPublishInfo_t ) ); pxPublishes[ i ].pPayload = payloadBuffers[ i ]; @@ -1639,37 +1956,23 @@ void prvPublishTask( void * pvParameters ) xCommandAdded = prvAddCommandToQueue( &xCommand ); /* Ensure command was added to queue. */ configASSERT( xCommandAdded == pdTRUE ); + /* Short delay so we do not bombard the broker with publishes. */ LogInfo( ( "Publish operation queued. Sleeping for %d ms.\n", mqttexamplePUBLISH_DELAY_ASYNC_MS ) ); vTaskDelay( pdMS_TO_TICKS( mqttexamplePUBLISH_DELAY_ASYNC_MS ) ); } - LogInfo( ( "Finished publishing\n" ) ); - - for( int i = 0; i < mqttexamplePUBLISH_COUNT; i++ ) - { - if( pxContexts[ i ] == NULL ) - { - /* Don't try to free anything that wasn't initialized. */ - continue; - } - - ulWaitCounter = 0; - - while( ( ulNotification & ( 1U << i ) ) != ( 1U << i ) ) - { - LogInfo( ( "Waiting to free publish context %d.", i + 1 ) ); - xTaskNotifyWait( 0, ( 1U << i ), &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); + LogInfo( ( "Finished async publishes.\n" ) ); - if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) - { - LogError( ( "Loop free iteration %d exceeded maximum" - " wait time.", ( i + 1 ) ) ); - break; - } - } + /* Receive all task notifications. We may receive notifications in a + * different order, so we have two loops. If all notifications have been + * received, we can break early. */ + ( void ) prvNotificationWaitLoop( &ulNotification, ulExpectedNotifications, false ); + for( int i = 0; i < mqttexamplePUBLISH_COUNT / 2; i++ ) + { configASSERT( ( ulNotification & ( 1U << i ) ) == ( 1U << i ) ); + LogInfo( ( "Freeing publish context %d.", i + 1 ) ); vPortFree( pxContexts[ i ] ); vPortFree( topicBuffers[ i ] ); vPortFree( payloadBuffers[ i ] ); @@ -1679,12 +1982,13 @@ void prvPublishTask( void * pvParameters ) /* Clear this task's notifications. */ xTaskNotifyStateClear( NULL ); + ulNotification = ulTaskNotifyValueClear( NULL, ~( 0U ) ); /* Notify main task this task has completed. */ - xTaskNotify( xMainTask, mqttexamplePUBLISHER_TASK_COMPLETE_BIT, eSetBits ); + xTaskNotify( xMainTask, mqttexamplePUBLISHER_ASYNC_COMPLETE_BIT, eSetBits ); /* Delete this task. */ - LogInfo( ( "Deleting Publisher task." ) ); + LogInfo( ( "Deleting Async Publisher task." ) ); vTaskDelete( NULL ); } @@ -1710,7 +2014,6 @@ void prvSubscribeTask( void * pvParameters ) xSubscribeInfo.pTopicFilter = mqttexampleSUBSCRIBE_TOPIC_FILTER; xSubscribeInfo.topicFilterLength = ( uint16_t ) strlen( xSubscribeInfo.pTopicFilter ); LogInfo( ( "Topic filter: %.*s", xSubscribeInfo.topicFilterLength, xSubscribeInfo.pTopicFilter ) ); - LogInfo( ( "Filter length: %d", xSubscribeInfo.topicFilterLength ) ); /* Create the context and subscribe command. */ prvInitializeCommandContext( &xContext ); @@ -1729,25 +2032,15 @@ void prvSubscribeTask( void * pvParameters ) * Since this demo uses multiple tasks, we do not retry failed subscriptions, as the * server has likely already processed our first publish, and so this demo will not * complete successfully. */ - while( ( ulNotification & mqttexampleSUBSCRIBE_COMPLETE_BIT ) != mqttexampleSUBSCRIBE_COMPLETE_BIT ) - { - LogInfo( ( "Waiting for subscribe operation to complete." ) ); - xTaskNotifyWait( 0, mqttexampleSUBSCRIBE_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); - - if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) - { - LogError( ( "Subscribe Loop exceeded maximum wait time." ) ); - break; - } - } + LogInfo( ( "Waiting for subscribe operation to complete." ) ); + ( void ) prvNotificationWaitLoop( &ulNotification, mqttexampleSUBSCRIBE_COMPLETE_BIT, true ); configASSERT( ( ulNotification & mqttexampleSUBSCRIBE_COMPLETE_BIT ) == mqttexampleSUBSCRIBE_COMPLETE_BIT ); configASSERT( xContext.xReturnStatus == MQTTSuccess ); LogInfo( ( "Operation wait complete.\n" ) ); - ulWaitCounter = 0; - while( 1 ) + for( ; ; ) { /* It is possible that there is nothing to receive from the queue, and * this is expected, as there are delays between each publish. For this @@ -1759,14 +2052,18 @@ void prvSubscribeTask( void * pvParameters ) pxReceivedPublish = &( xReceivedPublish.xPublishInfo ); pxReceivedPublish->pTopicName = ( const char * ) xReceivedPublish.pcTopicNameBuf; pxReceivedPublish->pPayload = xReceivedPublish.pcPayloadBuf; - LogInfo( ( "Received publish on topic %.*s", pxReceivedPublish->topicNameLength, pxReceivedPublish->pTopicName ) ); - LogInfo( ( "Message payload: %.*s\n", ( int ) pxReceivedPublish->payloadLength, ( const char * ) pxReceivedPublish->pPayload ) ); + LogInfo( ( "Received publish on topic %.*s\nMessage payload: %.*s\n", + pxReceivedPublish->topicNameLength, + pxReceivedPublish->pTopicName, + ( int ) pxReceivedPublish->payloadLength, + ( const char * ) pxReceivedPublish->pPayload ) ); usNumReceived++; /* Reset the wait counter every time a publish is received. */ ulWaitCounter = 0; } - /* Break if all publishes have been received. */ + /* Since this is an infinite loop, we want to break if all publishes have + * been received. */ if( usNumReceived >= mqttexamplePUBLISH_COUNT ) { break; @@ -1779,12 +2076,14 @@ void prvSubscribeTask( void * pvParameters ) * the last publish. */ if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) { - LogError( ( "Publish receive loop exceeded maximum wait time." ) ); + LogError( ( "Publish receive loop exceeded maximum wait time.\n" ) ); break; } - LogInfo( ( "No messages queued, received %u publishes, sleeping for %d ms\n", + /* Delay a bit to give more time for publish messages to be received. */ + LogInfo( ( "No messages queued, received %u publish%s, sleeping for %d ms\n", usNumReceived, + ( usNumReceived == 1 ) ? "" : "es", mqttexampleSUBSCRIBE_TASK_DELAY_MS ) ); vTaskDelay( pdMS_TO_TICKS( mqttexampleSUBSCRIBE_TASK_DELAY_MS ) ); } @@ -1801,20 +2100,9 @@ void prvSubscribeTask( void * pvParameters ) xCommandAdded = prvAddCommandToQueue( &xCommand ); /* Ensure command was added to queue. */ configASSERT( xCommandAdded == pdTRUE ); - LogInfo( ( "Starting wait on operation\n" ) ); - ulWaitCounter = 0; - while( ( ulNotification & mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) != mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) - { - LogInfo( ( "Waiting for unsubscribe operation to complete." ) ); - xTaskNotifyWait( 0, mqttexampleUNSUBSCRIBE_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); - - if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) - { - LogError( ( "Unsubscribe loop exceeded maximum wait time." ) ); - break; - } - } + LogInfo( ( "Waiting for unsubscribe operation to complete." ) ); + ( void ) prvNotificationWaitLoop( &ulNotification, mqttexampleUNSUBSCRIBE_COMPLETE_BIT, true ); configASSERT( ( ulNotification & mqttexampleUNSUBSCRIBE_COMPLETE_BIT ) == mqttexampleUNSUBSCRIBE_COMPLETE_BIT ); LogInfo( ( "Operation wait complete.\n" ) ); @@ -1838,13 +2126,13 @@ void prvSubscribeTask( void * pvParameters ) static void prvMQTTDemoTask( void * pvParameters ) { - NetworkContext_t xNetworkContext = { 0 }; BaseType_t xNetworkStatus = pdFAIL; BaseType_t xResult = pdFALSE; uint32_t ulNotification = 0; - Command_t xCommand; MQTTStatus_t xMQTTStatus; - uint32_t ulWaitCounter = 0; + uint32_t ulExpectedNotifications = mqttexamplePUBLISHER_SYNC_COMPLETE_BIT | + mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT | + mqttexamplePUBLISHER_ASYNC_COMPLETE_BIT; ( void ) pvParameters; @@ -1854,31 +2142,29 @@ static void prvMQTTDemoTask( void * pvParameters ) xCommandQueue = xQueueCreate( mqttexampleCOMMAND_QUEUE_SIZE, sizeof( Command_t ) ); /* Create response queues for each task. */ xSubscriberResponseQueue = xQueueCreate( mqttexamplePUBLISH_QUEUE_SIZE, sizeof( PublishElement_t ) ); - /* Publish task doesn't receive anything in this demo, so it doesn't need a large queue. */ - xPublisherResponseQueue = xQueueCreate( 1, sizeof( PublishElement_t ) ); /* In this demo, send publishes on non-subscribed topics to this queue. * Note that this value is not meant to be changed after `prvCommandLoop` has * been called, since access to this variable is not protected by thread * synchronization primitives. */ - xDefaultResponseQueue = xPublisherResponseQueue; + xDefaultResponseQueue = xQueueCreate( 1, sizeof( PublishElement_t ) ); /* Connect to the broker. We connect here with the "clean session" flag set * to true in order to clear any prior state in the broker. We will disconnect * and later form a persistent session, so that it may be resumed if the * network suddenly disconnects. */ - LogInfo( ( "Creating a TCP connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); - xNetworkStatus = prvConnectNetwork( &xNetworkContext ); + xNetworkStatus = prvSocketConnect( &xNetworkContext ); configASSERT( xNetworkStatus == pdPASS ); - LogInfo( ( "Clearing broker state." ) ); - xMQTTStatus = prvMQTTConnect( &globalMqttContext, &xNetworkContext, true ); + LogInfo( ( "Creating a clean session to clear any broker state information." ) ); + xMQTTStatus = prvMQTTInit( &globalMqttContext, &xNetworkContext ); + configASSERT( xMQTTStatus == MQTTSuccess ); + xMQTTStatus = prvMQTTConnect( &globalMqttContext, true ); configASSERT( xMQTTStatus == MQTTSuccess ); /* Disconnect. */ xMQTTStatus = MQTT_Disconnect( &globalMqttContext ); configASSERT( xMQTTStatus == MQTTSuccess ); - LogInfo( ( "Disconnecting TCP connection." ) ); - xNetworkStatus = prvDisconnectNetwork( &xNetworkContext ); + xNetworkStatus = prvSocketDisconnect( &xNetworkContext ); configASSERT( xNetworkStatus == pdPASS ); for( ; ; ) @@ -1887,17 +2173,11 @@ static void prvMQTTDemoTask( void * pvParameters ) memset( pxPendingAcks, 0x00, mqttexamplePENDING_ACKS_MAX_SIZE * sizeof( AckInfo_t ) ); memset( pxSubscriptions, 0x00, mqttexampleSUBSCRIPTIONS_MAX_COUNT * sizeof( SubscriptionElement_t ) ); - /* Create inital process loop command. */ - prvCreateCommand( PROCESSLOOP, NULL, NULL, &xCommand ); - xResult = prvAddCommandToQueue( &xCommand ); - configASSERT( xResult == pdTRUE ); - - LogInfo( ( "Creating a TCP connection to %s.\r\n", democonfigMQTT_BROKER_ENDPOINT ) ); /* Connect to the broker. */ - xNetworkStatus = prvConnectNetwork( &xNetworkContext ); + xNetworkStatus = prvSocketConnect( &xNetworkContext ); configASSERT( xNetworkStatus == pdPASS ); /* Form an MQTT connection with a persistent session. */ - xMQTTStatus = prvMQTTConnect( &globalMqttContext, &xNetworkContext, false ); + xMQTTStatus = prvMQTTConnect( &globalMqttContext, false ); configASSERT( xMQTTStatus == MQTTSuccess ); configASSERT( globalMqttContext.connectStatus == MQTTConnected ); @@ -1905,52 +2185,30 @@ static void prvMQTTDemoTask( void * pvParameters ) * This must be less than or equal to the priority of the main task. */ xResult = xTaskCreate( prvSubscribeTask, "Subscriber", democonfigDEMO_STACKSIZE, NULL, tskIDLE_PRIORITY + 1, &xSubscribeTask ); configASSERT( xResult == pdPASS ); - xResult = xTaskCreate( prvPublishTask, "Publisher", democonfigDEMO_STACKSIZE, NULL, tskIDLE_PRIORITY, &xPublisherTask ); + xResult = xTaskCreate( prvSyncPublishTask, "SyncPublisher", democonfigDEMO_STACKSIZE, NULL, tskIDLE_PRIORITY, &xSyncPublisherTask ); + configASSERT( xResult == pdPASS ); + xResult = xTaskCreate( prvAsyncPublishTask, "AsyncPublisher", democonfigDEMO_STACKSIZE, NULL, tskIDLE_PRIORITY, &xAsyncPublisherTask ); configASSERT( xResult == pdPASS ); LogInfo( ( "Running command loop" ) ); prvCommandLoop(); - ulWaitCounter = 0; - /* Delete created tasks and queues. - * Wait for subscriber task to exit before cleaning up. */ - while( ( ulNotification & mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ) != mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ) - { - LogInfo( ( "Waiting for subscribe task to exit." ) ); - xTaskNotifyWait( 0, mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); + /* Delete created queues. Wait for tasks to exit before cleaning up. */ + LogInfo( ( "Waiting for tasks to exit." ) ); + ( void ) prvNotificationWaitLoop( &ulNotification, ulExpectedNotifications, false ); - if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) - { - LogError( ( "Subscribe task exceeded maximum wait time." ) ); - break; - } - } - - configASSERT( ( ulNotification & mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ) == mqttexampleSUBSCRIBE_TASK_COMPLETE_BIT ); - ulWaitCounter = 0; - - /* Wait for publishing task to exit before cleaning up. */ - while( ( ulNotification & mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) != mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) - { - LogInfo( ( "Waiting for publish task to exit." ) ); - xTaskNotifyWait( 0, mqttexamplePUBLISHER_TASK_COMPLETE_BIT, &ulNotification, mqttexampleDEMO_TICKS_TO_WAIT ); - - if( ++ulWaitCounter > mqttexampleMAX_WAIT_ITERATIONS ) - { - LogError( ( "Publish task exceeded maximum wait time." ) ); - break; - } - } - - configASSERT( ( ulNotification & mqttexamplePUBLISHER_TASK_COMPLETE_BIT ) == mqttexamplePUBLISHER_TASK_COMPLETE_BIT ); + configASSERT( ( ulNotification & ulExpectedNotifications ) == ulExpectedNotifications ); /* Reset queues. */ xQueueReset( xCommandQueue ); - xQueueReset( xPublisherResponseQueue ); + xQueueReset( xDefaultResponseQueue ); xQueueReset( xSubscriberResponseQueue ); - LogInfo( ( "Disconnecting TCP connection." ) ); - xNetworkStatus = prvDisconnectNetwork( &xNetworkContext ); + /* Clear task notifications. */ + ulNotification = ulTaskNotifyValueClear( NULL, ~( 0U ) ); + + /* Disconnect. */ + xNetworkStatus = prvSocketDisconnect( &xNetworkContext ); configASSERT( xNetworkStatus == pdPASS ); LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. Total free heap is %u.\r\n", xPortGetFreeHeapSize() ) ); diff --git a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/FreeRTOSIPConfig.h b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/FreeRTOSIPConfig.h index 68e49baccb..c0d6e17a8e 100644 --- a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/FreeRTOSIPConfig.h +++ b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/FreeRTOSIPConfig.h @@ -124,7 +124,7 @@ extern UBaseType_t uxRand(); * is not set to 1 then the network event hook will never be called. See * http://www.FreeRTOS.org/FreeRTOS-Plus/FreeRTOS_Plus_UDP/API/vApplicationIPNetworkEventHook.shtml */ -#define ipconfigUSE_NETWORK_EVENT_HOOK 1 +#define ipconfigUSE_NETWORK_EVENT_HOOK 1 /* Sockets have a send block time attribute. If FreeRTOS_sendto() is called but * a network buffer cannot be obtained then the calling task is held in the Blocked @@ -138,7 +138,7 @@ extern UBaseType_t uxRand(); * ipconfigMAX_SEND_BLOCK_TIME_TICKS is specified in RTOS ticks. A time in * milliseconds can be converted to a time in ticks by dividing the time in * milliseconds by portTICK_PERIOD_MS. */ -#define ipconfigUDP_MAX_SEND_BLOCK_TIME_TICKS ( 5000 / portTICK_PERIOD_MS ) +#define ipconfigUDP_MAX_SEND_BLOCK_TIME_TICKS ( 5000 / portTICK_PERIOD_MS ) /* If ipconfigUSE_DHCP is 1 then FreeRTOS+TCP will attempt to retrieve an IP * address, netmask, DNS server address and gateway address from a DHCP server. If @@ -147,7 +147,7 @@ extern UBaseType_t uxRand(); * set to 1 if a valid configuration cannot be obtained from a DHCP server for any * reason. The static configuration used is that passed into the stack by the * FreeRTOS_IPInit() function call. */ -#define ipconfigUSE_DHCP 1 +#define ipconfigUSE_DHCP 1 /* When ipconfigUSE_DHCP is set to 1, DHCP requests will be sent out at * increasing time intervals until either a reply is received from a DHCP server @@ -156,7 +156,7 @@ extern UBaseType_t uxRand(); * static IP address passed as a parameter to FreeRTOS_IPInit() if the * re-transmission time interval reaches ipconfigMAXIMUM_DISCOVER_TX_PERIOD without * a DHCP reply being received. */ -#define ipconfigMAXIMUM_DISCOVER_TX_PERIOD ( 120000 / portTICK_PERIOD_MS ) +#define ipconfigMAXIMUM_DISCOVER_TX_PERIOD ( 120000 / portTICK_PERIOD_MS ) /* The ARP cache is a table that maps IP addresses to MAC addresses. The IP * stack can only send a UDP message to a remove IP address if it knowns the MAC @@ -167,19 +167,19 @@ extern UBaseType_t uxRand(); * cache then the UDP message is replaced by a ARP message that solicits the * required MAC address information. ipconfigARP_CACHE_ENTRIES defines the maximum * number of entries that can exist in the ARP table at any one time. */ -#define ipconfigARP_CACHE_ENTRIES 6 +#define ipconfigARP_CACHE_ENTRIES 6 /* ARP requests that do not result in an ARP response will be re-transmitted a * maximum of ipconfigMAX_ARP_RETRANSMISSIONS times before the ARP request is * aborted. */ -#define ipconfigMAX_ARP_RETRANSMISSIONS ( 5 ) +#define ipconfigMAX_ARP_RETRANSMISSIONS ( 5 ) /* ipconfigMAX_ARP_AGE defines the maximum time between an entry in the ARP * table being created or refreshed and the entry being removed because it is stale. * New ARP requests are sent for ARP cache entries that are nearing their maximum * age. ipconfigMAX_ARP_AGE is specified in tens of seconds, so a value of 150 is * equal to 1500 seconds (or 25 minutes). */ -#define ipconfigMAX_ARP_AGE 150 +#define ipconfigMAX_ARP_AGE 150 /* Implementing FreeRTOS_inet_addr() necessitates the use of string handling * routines, which are relatively large. To save code space the full @@ -191,19 +191,19 @@ extern UBaseType_t uxRand(); * ipconfigINCLUDE_FULL_INET_ADDR is set to 1 then both FreeRTOS_inet_addr() and * FreeRTOS_indet_addr_quick() are available. If ipconfigINCLUDE_FULL_INET_ADDR is * not set to 1 then only FreeRTOS_indet_addr_quick() is available. */ -#define ipconfigINCLUDE_FULL_INET_ADDR 1 +#define ipconfigINCLUDE_FULL_INET_ADDR 1 /* ipconfigNUM_NETWORK_BUFFER_DESCRIPTORS defines the total number of network buffer that * are available to the IP stack. The total number of network buffers is limited * to ensure the total amount of RAM that can be consumed by the IP stack is capped * to a pre-determinable value. */ -#define ipconfigNUM_NETWORK_BUFFER_DESCRIPTORS 60 +#define ipconfigNUM_NETWORK_BUFFER_DESCRIPTORS 60 /* A FreeRTOS queue is used to send events from application tasks to the IP * stack. ipconfigEVENT_QUEUE_LENGTH sets the maximum number of events that can * be queued for processing at any one time. The event queue must be a minimum of * 5 greater than the total number of network buffers. */ -#define ipconfigEVENT_QUEUE_LENGTH ( ipconfigNUM_NETWORK_BUFFER_DESCRIPTORS + 5 ) +#define ipconfigEVENT_QUEUE_LENGTH ( ipconfigNUM_NETWORK_BUFFER_DESCRIPTORS + 5 ) /* The address of a socket is the combination of its IP address and its port * number. FreeRTOS_bind() is used to manually allocate a port number to a socket @@ -217,48 +217,48 @@ extern UBaseType_t uxRand(); * ipconfigALLOW_SOCKET_SEND_WITHOUT_BIND is set to 0 then calling FreeRTOS_sendto() * on a socket that has not yet been bound will result in the send operation being * aborted. */ -#define ipconfigALLOW_SOCKET_SEND_WITHOUT_BIND 1 +#define ipconfigALLOW_SOCKET_SEND_WITHOUT_BIND 1 /* Defines the Time To Live (TTL) values used in outgoing UDP packets. */ -#define ipconfigUDP_TIME_TO_LIVE 128 -#define ipconfigTCP_TIME_TO_LIVE 128 /* also defined in FreeRTOSIPConfigDefaults.h */ +#define ipconfigUDP_TIME_TO_LIVE 128 +#define ipconfigTCP_TIME_TO_LIVE 128 /* also defined in FreeRTOSIPConfigDefaults.h */ /* USE_TCP: Use TCP and all its features */ -#define ipconfigUSE_TCP ( 1 ) +#define ipconfigUSE_TCP ( 1 ) /* Use the TCP socket wake context with a callback. */ -#define ipconfigSOCKET_HAS_USER_WAKE_CALLBACK_WITH_CONTEXT ( 1 ) +#define ipconfigSOCKET_HAS_USER_WAKE_CALLBACK ( 1 ) /* USE_WIN: Let TCP use windowing mechanism. */ -#define ipconfigUSE_TCP_WIN ( 1 ) +#define ipconfigUSE_TCP_WIN ( 1 ) /* The MTU is the maximum number of bytes the payload of a network frame can * contain. For normal Ethernet V2 frames the maximum MTU is 1500. Setting a * lower value can save RAM, depending on the buffer management scheme used. If * ipconfigCAN_FRAGMENT_OUTGOING_PACKETS is 1 then (ipconfigNETWORK_MTU - 28) must * be divisible by 8. */ -#define ipconfigNETWORK_MTU 1200 +#define ipconfigNETWORK_MTU 1200 /* Set ipconfigUSE_DNS to 1 to include a basic DNS client/resolver. DNS is used * through the FreeRTOS_gethostbyname() API function. */ -#define ipconfigUSE_DNS 1 +#define ipconfigUSE_DNS 1 /* If ipconfigREPLY_TO_INCOMING_PINGS is set to 1 then the IP stack will * generate replies to incoming ICMP echo (ping) requests. */ -#define ipconfigREPLY_TO_INCOMING_PINGS 1 +#define ipconfigREPLY_TO_INCOMING_PINGS 1 /* If ipconfigSUPPORT_OUTGOING_PINGS is set to 1 then the * FreeRTOS_SendPingRequest() API function is available. */ -#define ipconfigSUPPORT_OUTGOING_PINGS 0 +#define ipconfigSUPPORT_OUTGOING_PINGS 0 /* If ipconfigSUPPORT_SELECT_FUNCTION is set to 1 then the FreeRTOS_select() * (and associated) API function is available. */ -#define ipconfigSUPPORT_SELECT_FUNCTION 1 +#define ipconfigSUPPORT_SELECT_FUNCTION 1 /* If ipconfigFILTER_OUT_NON_ETHERNET_II_FRAMES is set to 1 then Ethernet frames * that are not in Ethernet II format will be dropped. This option is included for * potential future IP stack developments. */ -#define ipconfigFILTER_OUT_NON_ETHERNET_II_FRAMES 1 +#define ipconfigFILTER_OUT_NON_ETHERNET_II_FRAMES 1 /* If ipconfigETHERNET_DRIVER_FILTERS_FRAME_TYPES is set to 1 then it is the * responsibility of the Ethernet interface to filter out packets that are of no @@ -268,30 +268,30 @@ extern UBaseType_t uxRand(); * because the packet will already have been passed into the stack). If the * Ethernet driver does all the necessary filtering in hardware then software * filtering can be removed by using a value other than 1 or 0. */ -#define ipconfigETHERNET_DRIVER_FILTERS_FRAME_TYPES 1 +#define ipconfigETHERNET_DRIVER_FILTERS_FRAME_TYPES 1 /* The windows simulator cannot really simulate MAC interrupts, and needs to * block occasionally to allow other tasks to run. */ -#define configWINDOWS_MAC_INTERRUPT_SIMULATOR_DELAY ( 20 / portTICK_PERIOD_MS ) +#define configWINDOWS_MAC_INTERRUPT_SIMULATOR_DELAY ( 20 / portTICK_PERIOD_MS ) /* Advanced only: in order to access 32-bit fields in the IP packets with * 32-bit memory instructions, all packets will be stored 32-bit-aligned, plus 16-bits. * This has to do with the contents of the IP-packets: all 32-bit fields are * 32-bit-aligned, plus 16-bit(!) */ -#define ipconfigPACKET_FILLER_SIZE 2 +#define ipconfigPACKET_FILLER_SIZE 2 /* Define the size of the pool of TCP window descriptors. On the average, each * TCP socket will use up to 2 x 6 descriptors, meaning that it can have 2 x 6 * outstanding packets (for Rx and Tx). When using up to 10 TP sockets * simultaneously, one could define TCP_WIN_SEG_COUNT as 120. */ -#define ipconfigTCP_WIN_SEG_COUNT 240 +#define ipconfigTCP_WIN_SEG_COUNT 240 /* Each TCP socket has a circular buffers for Rx and Tx, which have a fixed * maximum size. Define the size of Rx buffer for TCP sockets. */ -#define ipconfigTCP_RX_BUFFER_LENGTH ( 1000 ) +#define ipconfigTCP_RX_BUFFER_LENGTH ( 1000 ) /* Define the size of Tx buffer for TCP sockets. */ -#define ipconfigTCP_TX_BUFFER_LENGTH ( 1000 ) +#define ipconfigTCP_TX_BUFFER_LENGTH ( 1000 ) /* When using call-back handlers, the driver may check if the handler points to * real program memory (RAM or flash) or just has a random non-zero value. */ diff --git a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/demo_config.h b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/demo_config.h index 62ec41f69b..1ec381981e 100644 --- a/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/demo_config.h +++ b/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask/demo_config.h @@ -154,13 +154,77 @@ * for an MQTT broker that only has an IP address but no hostname. However, * SNI should be enabled whenever possible. */ -#define democonfigDISABLE_SNI ( pdFALSE ) +#define democonfigDISABLE_SNI ( pdFALSE ) + +/** + * @brief Configuration that indicates if the demo connection is made to the AWS IoT Core MQTT broker. + * + * If username/password based authentication is used, the demo will use appropriate TLS ALPN and + * SNI configurations as required for the Custom Authentication feature of AWS IoT. + * For more information, refer to the following documentation: + * https://docs.aws.amazon.com/iot/latest/developerguide/custom-auth.html#custom-auth-mqtt + * + * #define democonfigUSE_AWS_IOT_CORE_BROKER ( 1 ) + */ + +/** + * @brief The username value for authenticating client to the MQTT broker when + * username/password based client authentication is used. + * + * For AWS IoT MQTT broker, refer to the AWS IoT documentation below for + * details regarding client authentication with a username and password. + * https://docs.aws.amazon.com/iot/latest/developerguide/custom-authentication.html + * An authorizer setup needs to be done, as mentioned in the above link, to use + * username/password based client authentication. + * + * #define democonfigCLIENT_USERNAME "...insert here..." + */ + +/** + * @brief The password value for authenticating client to the MQTT broker when + * username/password based client authentication is used. + * + * For AWS IoT MQTT broker, refer to the AWS IoT documentation below for + * details regarding client authentication with a username and password. + * https://docs.aws.amazon.com/iot/latest/developerguide/custom-authentication.html + * An authorizer setup needs to be done, as mentioned in the above link, to use + * username/password based client authentication. + * + * #define democonfigCLIENT_PASSWORD "...insert here..." + */ + +/** + * @brief The name of the operating system that the application is running on. + * The current value is given as an example. Please update for your specific + * operating system. + */ +#define democonfigOS_NAME "FreeRTOS" + +/** + * @brief The version of the operating system that the application is running + * on. The current value is given as an example. Please update for your specific + * operating system version. + */ +#define democonfigOS_VERSION tskKERNEL_VERSION_NUMBER + +/** + * @brief The name of the hardware platform the application is running on. The + * current value is given as an example. Please update for your specific + * hardware platform. + */ +#define democonfigHARDWARE_PLATFORM_NAME "WinSim" + +/** + * @brief The name of the MQTT library used and its version, following an "@" + * symbol. + */ +#define democonfigMQTT_LIB "core-mqtt@1.0.0" /** * @brief Whether to use mutual authentication. If this macro is not set to 1 * or not defined, then plaintext TCP will be used instead of TLS over TCP. */ -#define democonfigUSE_TLS 1 +#define democonfigUSE_TLS 1 /** * @brief Set the stack size of the main demo task. @@ -168,6 +232,6 @@ * In the Windows port, this stack only holds a structure. The actual * stack is created by an operating system thread. */ -#define democonfigDEMO_STACKSIZE configMINIMAL_STACK_SIZE +#define democonfigDEMO_STACKSIZE configMINIMAL_STACK_SIZE #endif /* DEMO_CONFIG_H */