Replace multithreading demo with MQTT Agent (#563)

* Update MQTT agent submodule

* Copy MQTT agent demo files

* Remove other demos from connection manager

* Update demo config and uncrustify

* Update readme files

* Fix headers
pull/567/head^2
Muneeb Ahmed 4 years ago committed by GitHub
parent 7a695784bc
commit ef6194a7ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -44,14 +44,14 @@
/*-----------------------------------------------------------*/
bool Agent_MessageSend( const AgentMessageContext_t * pMsgCtx,
const void * pData,
Command_t * const * pCommandToSend,
uint32_t blockTimeMs )
{
BaseType_t queueStatus = pdFAIL;
if( ( pMsgCtx != NULL ) && ( pData != NULL ) )
if( ( pMsgCtx != NULL ) && ( pCommandToSend != NULL ) )
{
queueStatus = xQueueSendToBack( pMsgCtx->queue, pData, pdMS_TO_TICKS( blockTimeMs ) );
queueStatus = xQueueSendToBack( pMsgCtx->queue, pCommandToSend, pdMS_TO_TICKS( blockTimeMs ) );
}
return ( queueStatus == pdPASS ) ? true : false;
@ -60,14 +60,14 @@ bool Agent_MessageSend( const AgentMessageContext_t * pMsgCtx,
/*-----------------------------------------------------------*/
bool Agent_MessageReceive( const AgentMessageContext_t * pMsgCtx,
void * pBuffer,
Command_t ** pReceivedCommand,
uint32_t blockTimeMs )
{
BaseType_t queueStatus = pdFAIL;
if( ( pMsgCtx != NULL ) && ( pBuffer != NULL ) )
if( ( pMsgCtx != NULL ) && ( pReceivedCommand != NULL ) )
{
queueStatus = xQueueReceive( pMsgCtx->queue, pBuffer, pdMS_TO_TICKS( blockTimeMs ) );
queueStatus = xQueueReceive( pMsgCtx->queue, pReceivedCommand, pdMS_TO_TICKS( blockTimeMs ) );
}
return ( queueStatus == pdPASS ) ? true : false;

@ -104,7 +104,6 @@ void Agent_InitializePool( void )
Command_t * Agent_GetCommand( uint32_t blockTimeMs )
{
Command_t * structToUse = NULL;
size_t i;
bool structRetrieved = false;
/* Check queue has been created. */
@ -125,7 +124,6 @@ Command_t * Agent_GetCommand( uint32_t blockTimeMs )
bool Agent_ReleaseCommand( Command_t * pCommandToRelease )
{
size_t i;
bool structReturned = false;
configASSERT( initStatus == QUEUE_INITIALIZED );

@ -58,13 +58,13 @@ struct AgentMessageContext
* Must be thread safe.
*
* @param[in] pMsgCtx An #AgentMessageContext_t.
* @param[in] pData Pointer to element to send to queue.
* @param[in] pCommandToSend Pointer to address to send to queue.
* @param[in] blockTimeMs Block time to wait for a send.
*
* @return `true` if send was successful, else `false`.
*/
bool Agent_MessageSend( const AgentMessageContext_t * pMsgCtx,
const void * pData,
Command_t * const * pCommandToSend,
uint32_t blockTimeMs );
/**
@ -72,13 +72,13 @@ bool Agent_MessageSend( const AgentMessageContext_t * pMsgCtx,
* Must be thread safe.
*
* @param[in] pMsgCtx An #AgentMessageContext_t.
* @param[in] pBuffer Pointer to buffer to write received data.
* @param[in] pReceivedCommand Pointer to write address of received command.
* @param[in] blockTimeMs Block time to wait for a receive.
*
* @return `true` if receive was successful, else `false`.
*/
bool Agent_MessageReceive( const AgentMessageContext_t * pMsgCtx,
void * pBuffer,
Command_t ** pReceivedCommand,
uint32_t blockTimeMs );
#endif /* FREERTOS_AGENT_MESSAGE_H */

@ -0,0 +1,952 @@
/*
* FreeRTOS V202012.00
* Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* https://www.FreeRTOS.org
* https://github.com/FreeRTOS
*
*/
/*
* This demo creates multiple tasks, all of which use the MQTT agent API to
* communicate with an MQTT broker through the same MQTT connection.
*
* This file contains the initial task created after the TCP/IP stack connects
* to the network. The task:
*
* 1) Connects to the MQTT broker.
* 2) Creates the other demo tasks, in accordance with the #defines set in
* demo_config.h. For example, if demo_config.h contains the following
* setting:
*
* #define democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE 3
*
* then the initial task will create three instances of the task
* implemented in simple_sub_pub_demo.c. See the comments at the top
* of that file for more information.
*
* 3) After creating the demo tasks the initial task could create the MQTT
* agent task. However, as it has no other operations to perform, rather
* than create the MQTT agent as a separate task the initial task just calls
* the agent's implementing function - effectively turning itself into the
* MQTT agent.
*/
/* Standard includes. */
#include <string.h>
#include <stdio.h>
#include <assert.h>
/* Kernel includes. */
#include "FreeRTOS.h"
#include "queue.h"
#include "task.h"
/* FreeRTOS+TCP includes. */
#include "FreeRTOS_IP.h"
#include "FreeRTOS_Sockets.h"
/* Demo Specific configs. */
#include "demo_config.h"
/* MQTT library includes. */
#include "core_mqtt.h"
/* MQTT agent include. */
#include "mqtt_agent.h"
/* MQTT Agent ports. */
#include "freertos_agent_message.h"
#include "freertos_command_pool.h"
/* Exponential backoff retry include. */
#include "backoff_algorithm.h"
/* Subscription manager header include. */
#include "subscription_manager.h"
/* Transport interface include. */
#if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
#include "using_mbedtls.h"
#else
#include "using_plaintext.h"
#endif
/* This demo uses compile time options to select the demo tasks to created.
* Ensure the compile time options are defined. These should be defined in
* demo_config.h. */
#if !defined( democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE ) || ( democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE < 1 )
#error Please set democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE to the number of tasks to create in vStartSimpleSubscribePublishTask().
#endif
#if ( democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE > 0 ) && !defined( democonfigSIMPLE_SUB_PUB_TASK_STACK_SIZE )
#error Please define democonfigSIMPLE_SUB_PUB_TASK_STACK_SIZE in demo_config.h to set the stack size (in words, not bytes) for the tasks created by vStartSimpleSubscribePublishTask().
#endif
/**
* @brief Dimensions the buffer used to serialize and deserialize MQTT packets.
* @note Specified in bytes. Must be large enough to hold the maximum
* anticipated MQTT payload.
*/
#ifndef MQTT_AGENT_NETWORK_BUFFER_SIZE
#define MQTT_AGENT_NETWORK_BUFFER_SIZE ( 5000 )
#endif
/**
* @brief The length of the queue used to hold commands for the agent.
*/
#ifndef MQTT_AGENT_COMMAND_QUEUE_LENGTH
#define MQTT_AGENT_COMMAND_QUEUE_LENGTH ( 10 )
#endif
/**
* These configuration settings are required to run the demo.
*/
/**
* @brief Timeout for receiving CONNACK after sending an MQTT CONNECT packet.
* Defined in milliseconds.
*/
#define mqttexampleCONNACK_RECV_TIMEOUT_MS ( 1000U )
/**
* @brief The maximum number of retries for network operation with server.
*/
#define RETRY_MAX_ATTEMPTS ( 5U )
/**
* @brief The maximum back-off delay (in milliseconds) for retrying failed operation
* with server.
*/
#define RETRY_MAX_BACKOFF_DELAY_MS ( 5000U )
/**
* @brief The base back-off delay (in milliseconds) to use for network operation retry
* attempts.
*/
#define RETRY_BACKOFF_BASE_MS ( 500U )
/**
* @brief The maximum time interval in seconds which is allowed to elapse
* between two Control Packets.
*
* It is the responsibility of the Client to ensure that the interval between
* Control Packets being sent does not exceed the this Keep Alive value. In the
* absence of sending any other Control Packets, the Client MUST send a
* PINGREQ Packet.
*//*_RB_ Move to be the responsibility of the agent. */
#define mqttexampleKEEP_ALIVE_INTERVAL_SECONDS ( 60U )
/**
* @brief Socket send and receive timeouts to use. Specified in milliseconds.
*/
#define mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 750 )
/**
* @brief Used to convert times to/from ticks and milliseconds.
*/
#define mqttexampleMILLISECONDS_PER_SECOND ( 1000U )
#define mqttexampleMILLISECONDS_PER_TICK ( mqttexampleMILLISECONDS_PER_SECOND / configTICK_RATE_HZ )
/*-----------------------------------------------------------*/
/**
* @brief Each compilation unit that consumes the NetworkContext must define it.
* It should contain a single pointer to the type of your desired transport.
* When using multiple transports in the same compilation unit, define this pointer as void *.
*
* @note Transport stacks are defined in FreeRTOS-Plus/Source/Application-Protocols/network_transport.
*/
struct NetworkContext
{
#if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
TlsTransportParams_t * pParams;
#else
PlaintextTransportParams_t * pParams;
#endif
};
/*-----------------------------------------------------------*/
/**
* @brief Initializes an MQTT context, including transport interface and
* network buffer.
*
* @return `MQTTSuccess` if the initialization succeeds, else `MQTTBadParameter`.
*/
static MQTTStatus_t prvMQTTInit( void );
/**
* @brief Sends an MQTT Connect packet over the already connected TCP socket.
*
* @param[in] pxMQTTContext MQTT context pointer.
* @param[in] xCleanSession If a clean session should be established.
*
* @return `MQTTSuccess` if connection succeeds, else appropriate error code
* from MQTT_Connect.
*/
static MQTTStatus_t prvMQTTConnect( bool xCleanSession );
/**
* @brief Connect a TCP socket to the MQTT broker.
*
* @param[in] pxNetworkContext Network context.
*
* @return `pdPASS` if connection succeeds, else `pdFAIL`.
*/
static BaseType_t prvSocketConnect( NetworkContext_t * pxNetworkContext );
/**
* @brief Disconnect a TCP connection.
*
* @param[in] pxNetworkContext Network context.
*
* @return `pdPASS` if disconnect succeeds, else `pdFAIL`.
*/
static BaseType_t prvSocketDisconnect( NetworkContext_t * pxNetworkContext );
/**
* @brief Callback executed when there is activity on the TCP socket that is
* connected to the MQTT broker. If there are no messages in the MQTT agent's
* command queue then the callback send a message to ensure the MQTT agent
* task unblocks and can therefore process whatever is necessary on the socket
* (if anything) as quickly as possible.
*
* @param[in] pxSocket Socket with data, unused.
*/
static void prvMQTTClientSocketWakeupCallback( Socket_t pxSocket );
/**
* @brief Fan out the incoming publishes to the callbacks registered by different
* tasks. If there are no callbacks registered for the incoming publish, it will be
* passed to the unsolicited publish handler.
*
* @param[in] pMqttAgentContext Agent context.
* @param[in] packetId Packet ID of publish.
* @param[in] pxPublishInfo Info of incoming publish.
*/
static void prvIncomingPublishCallback( MQTTAgentContext_t * pMqttAgentContext,
uint16_t packetId,
MQTTPublishInfo_t * pxPublishInfo );
/**
* @brief Function to attempt to resubscribe to the topics already present in the
* subscription list.
*
* This function will be invoked when this demo requests the broker to
* reestablish the session and the broker cannot do so. This function will
* enqueue commands to the MQTT Agent queue and will be processed once the
* command loop starts.
*
* @return `MQTTSuccess` if adding subscribes to the command queue succeeds, else
* appropriate error code from MQTTAgent_Subscribe.
* */
static MQTTStatus_t prvHandleResubscribe( void );
/**
* @brief Passed into MQTTAgent_Subscribe() as the callback to execute when the
* broker ACKs the SUBSCRIBE message. This callback implementation is used for
* handling the completion of resubscribes. Any topic filter failed to resubscribe
* will be removed from the subscription list.
*
* See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call
*
* @param[in] pxCommandContext Context of the initial command.
* @param[in] pxReturnInfo The result of the command.
*/
static void prvSubscriptionCommandCallback( void * pxCommandContext,
MQTTAgentReturnInfo_t * pxReturnInfo );
/**
* @brief Task used to run the MQTT agent. In this example the first task that
* is created is responsible for creating all the other demo tasks. Then,
* rather than create prvMQTTAgentTask() as a separate task, it simply calls
* prvMQTTAgentTask() to become the agent task itself.
*
* This task calls MQTTAgent_CommandLoop() in a loop, until MQTTAgent_Terminate()
* is called. If an error occurs in the command loop, then it will reconnect the
* TCP and MQTT connections.
*
* @param[in] pvParameters Parameters as passed at the time of task creation. Not
* used in this example.
*/
static void prvMQTTAgentTask( void * pvParameters );
/**
* @brief The main task used in the MQTT demo.
*
* This task creates the network connection and all other demo tasks. Then,
* rather than create prvMQTTAgentTask() as a separate task, it simply calls
* prvMQTTAgentTask() to become the agent task itself.
*
* @param[in] pvParameters Parameters as passed at the time of task creation. Not
* used in this example.
*/
static void prvConnectAndCreateDemoTasks( void * pvParameters );
/**
* @brief The timer query function provided to the MQTT context.
*
* @return Time in milliseconds.
*/
static uint32_t prvGetTimeMs( void );
/**
* @brief Connects a TCP socket to the MQTT broker, then creates and MQTT
* connection to the same.
*/
static void prvConnectToMQTTBroker( void );
/*
* Functions that start the tasks demonstrated by this project.
*/
extern void vStartSimpleSubscribePublishTask( uint32_t ulTaskNumber,
configSTACK_DEPTH_TYPE uxStackSize,
UBaseType_t uxPriority );
/*-----------------------------------------------------------*/
/**
* @brief The network context used by the MQTT library transport interface.
* See https://www.freertos.org/network-interface.html
*/
static NetworkContext_t xNetworkContext;
#if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
/**
* @brief The parameters for the network context using a TLS channel.
*/
static TlsTransportParams_t xTlsTransportParams;
#else
/**
* @brief The parameters for the network context using a non-encrypted channel.
*/
static PlaintextTransportParams_t xPlaintextTransportParams;
#endif
/**
* @brief Global entry time into the application to use as a reference timestamp
* in the #prvGetTimeMs function. #prvGetTimeMs will always return the difference
* between the current time and the global entry time. This will reduce the chances
* of overflow for the 32 bit unsigned integer used for holding the timestamp.
*/
static uint32_t ulGlobalEntryTimeMs;
MQTTAgentContext_t xGlobalMqttAgentContext;
static uint8_t xNetworkBuffer[ MQTT_AGENT_NETWORK_BUFFER_SIZE ];
static AgentMessageContext_t xCommandQueue;
/**
* @brief The global array of subscription elements.
*
* @note No thread safety is required to this array, since the updates the array
* elements are done only from one task at a time. The subscription manager
* implementation expects that the array of the subscription elements used for
* storing subscriptions to be initialized to 0. As this is a global array, it
* will be initialized to 0 by default.
*/
SubscriptionElement_t xGlobalSubscriptionList[ SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS ];
/*-----------------------------------------------------------*/
/*
* @brief Create the task that demonstrates the MQTT Connection sharing demo.
*/
void vStartSimpleMQTTDemo( void )
{
/* prvConnectAndCreateDemoTasks() connects to the MQTT broker, creates the
* tasks that will interact with the broker via the MQTT agent, then turns
* itself into the MQTT agent task. */
xTaskCreate( prvConnectAndCreateDemoTasks, /* Function that implements the task. */
"ConnectManager", /* Text name for the task - only used for debugging. */
democonfigDEMO_STACKSIZE, /* Size of stack (in words, not bytes) to allocate for the task. */
NULL, /* Optional - task parameter - not used in this case. */
tskIDLE_PRIORITY + 1, /* Task priority, must be between 0 and configMAX_PRIORITIES - 1. */
NULL ); /* Optional - used to pass out a handle to the created task. */
}
/*-----------------------------------------------------------*/
static MQTTStatus_t prvMQTTInit( void )
{
TransportInterface_t xTransport;
MQTTStatus_t xReturn;
MQTTFixedBuffer_t xFixedBuffer = { .pBuffer = xNetworkBuffer, .size = MQTT_AGENT_NETWORK_BUFFER_SIZE };
static uint8_t staticQueueStorageArea[ MQTT_AGENT_COMMAND_QUEUE_LENGTH * sizeof( Command_t * ) ];
static StaticQueue_t staticQueueStructure;
AgentMessageInterface_t messageInterface =
{
.pMsgCtx = NULL,
.send = Agent_MessageSend,
.recv = Agent_MessageReceive,
.getCommand = Agent_GetCommand,
.releaseCommand = Agent_ReleaseCommand
};
LogDebug( ( "Creating command queue." ) );
xCommandQueue.queue = xQueueCreateStatic( MQTT_AGENT_COMMAND_QUEUE_LENGTH,
sizeof( Command_t * ),
staticQueueStorageArea,
&staticQueueStructure );
configASSERT( xCommandQueue.queue );
messageInterface.pMsgCtx = &xCommandQueue;
/* Initialize the task pool. */
Agent_InitializePool();
/* Fill in Transport Interface send and receive function pointers. */
xTransport.pNetworkContext = &xNetworkContext;
#if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
xTransport.send = TLS_FreeRTOS_send;
xTransport.recv = TLS_FreeRTOS_recv;
#else
xTransport.send = Plaintext_FreeRTOS_send;
xTransport.recv = Plaintext_FreeRTOS_recv;
#endif
/* Initialize MQTT library. */
xReturn = MQTTAgent_Init( &xGlobalMqttAgentContext,
&messageInterface,
&xFixedBuffer,
&xTransport,
prvGetTimeMs,
prvIncomingPublishCallback,
/* Context to pass into the callback. Passing the pointer to subscription array. */
xGlobalSubscriptionList );
return xReturn;
}
/*-----------------------------------------------------------*/
static MQTTStatus_t prvMQTTConnect( bool xCleanSession )
{
MQTTStatus_t xResult;
MQTTConnectInfo_t xConnectInfo;
bool xSessionPresent = false;
/* Many fields are not used in this demo so start with everything at 0. */
memset( &xConnectInfo, 0x00, sizeof( xConnectInfo ) );
/* Start with a clean session i.e. direct the MQTT broker to discard any
* previous session data. Also, establishing a connection with clean session
* will ensure that the broker does not store any data when this client
* gets disconnected. */
xConnectInfo.cleanSession = xCleanSession;
/* The client identifier is used to uniquely identify this MQTT client to
* the MQTT broker. In a production device the identifier can be something
* unique, such as a device serial number. */
xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER;
xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER );
/* Set MQTT keep-alive period. It is the responsibility of the application
* to ensure that the interval between Control Packets being sent does not
* exceed the Keep Alive value. In the absence of sending any other Control
* Packets, the Client MUST send a PINGREQ Packet. This responsibility will
* be moved inside the agent. */
xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_INTERVAL_SECONDS;
/* Append metrics when connecting to the AWS IoT Core broker. */
#ifdef democonfigUSE_AWS_IOT_CORE_BROKER
#ifdef democonfigCLIENT_USERNAME
xConnectInfo.pUserName = CLIENT_USERNAME_WITH_METRICS;
xConnectInfo.userNameLength = ( uint16_t ) strlen( CLIENT_USERNAME_WITH_METRICS );
xConnectInfo.pPassword = democonfigCLIENT_PASSWORD;
xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD );
#else
xConnectInfo.pUserName = AWS_IOT_METRICS_STRING;
xConnectInfo.userNameLength = AWS_IOT_METRICS_STRING_LENGTH;
/* Password for authentication is not used. */
xConnectInfo.pPassword = NULL;
xConnectInfo.passwordLength = 0U;
#endif
#else /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */
#ifdef democonfigCLIENT_USERNAME
xConnectInfo.pUserName = democonfigCLIENT_USERNAME;
xConnectInfo.userNameLength = ( uint16_t ) strlen( democonfigCLIENT_USERNAME );
xConnectInfo.pPassword = democonfigCLIENT_PASSWORD;
xConnectInfo.passwordLength = ( uint16_t ) strlen( democonfigCLIENT_PASSWORD );
#endif /* ifdef democonfigCLIENT_USERNAME */
#endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */
/* Send MQTT CONNECT packet to broker. MQTT's Last Will and Testament feature
* is not used in this demo, so it is passed as NULL. */
xResult = MQTT_Connect( &( xGlobalMqttAgentContext.mqttContext ),
&xConnectInfo,
NULL,
mqttexampleCONNACK_RECV_TIMEOUT_MS,
&xSessionPresent );
LogInfo( ( "Session present: %d\n", xSessionPresent ) );
/* Resume a session if desired. */
if( ( xResult == MQTTSuccess ) && ( xCleanSession == false ) )
{
xResult = MQTTAgent_ResumeSession( &xGlobalMqttAgentContext, xSessionPresent );
/* Resubscribe to all the subscribed topics. */
if( ( xResult == MQTTSuccess ) && ( xSessionPresent == false ) )
{
xResult = prvHandleResubscribe();
}
}
return xResult;
}
/*-----------------------------------------------------------*/
static MQTTStatus_t prvHandleResubscribe( void )
{
MQTTStatus_t xResult = MQTTBadParameter;
uint32_t ulIndex = 0U;
uint16_t usNumSubscriptions = 0U;
/* These variables need to stay in scope until command completes. */
static MQTTAgentSubscribeArgs_t xSubArgs = { 0 };
static MQTTSubscribeInfo_t xSubInfo[ SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS ] = { 0 };
static CommandInfo_t xCommandParams = { 0 };
/* Loop through each subscription in the subscription list and add a subscribe
* command to the command queue. */
for( ulIndex = 0U; ulIndex < SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; ulIndex++ )
{
/* Check if there is a subscription in the subscription list. This demo
* doesn't check for duplicate subscriptions. */
if( xGlobalSubscriptionList[ ulIndex ].usFilterStringLength != 0 )
{
xSubInfo[ usNumSubscriptions ].pTopicFilter = xGlobalSubscriptionList[ ulIndex ].pcSubscriptionFilterString;
xSubInfo[ usNumSubscriptions ].topicFilterLength = xGlobalSubscriptionList[ ulIndex ].usFilterStringLength;
/* QoS1 is used for all the subscriptions in this demo. */
xSubInfo[ usNumSubscriptions ].qos = MQTTQoS1;
LogInfo( ( "Resubscribe to the topic %.*s will be attempted.",
xSubInfo[ usNumSubscriptions ].topicFilterLength,
xSubInfo[ usNumSubscriptions ].pTopicFilter ) );
usNumSubscriptions++;
}
}
if( usNumSubscriptions > 0U )
{
xSubArgs.pSubscribeInfo = xSubInfo;
xSubArgs.numSubscriptions = usNumSubscriptions;
/* The block time can be 0 as the command loop is not running at this point. */
xCommandParams.blockTimeMs = 0U;
xCommandParams.cmdCompleteCallback = prvSubscriptionCommandCallback;
xCommandParams.pCmdCompleteCallbackContext = ( void * ) &xSubArgs;
/* Enqueue subscribe to the command queue. These commands will be processed only
* when command loop starts. */
xResult = MQTTAgent_Subscribe( &xGlobalMqttAgentContext, &xSubArgs, &xCommandParams );
}
else
{
/* Mark the resubscribe as success if there is nothing to be subscribed. */
xResult = MQTTSuccess;
}
if( xResult != MQTTSuccess )
{
LogError( ( "Failed to enqueue the MQTT subscribe command. xResult=%s.",
MQTT_Status_strerror( xResult ) ) );
}
return xResult;
}
/*-----------------------------------------------------------*/
static void prvSubscriptionCommandCallback( void * pxCommandContext,
MQTTAgentReturnInfo_t * pxReturnInfo )
{
size_t lIndex = 0;
MQTTAgentSubscribeArgs_t * pxSubscribeArgs = ( MQTTAgentSubscribeArgs_t * ) pxCommandContext;
/* If the return code is success, no further action is required as all the topic filters
* are already part of the subscription list. */
if( pxReturnInfo->returnCode != MQTTSuccess )
{
/* Check through each of the suback codes and determine if there are any failures. */
for( lIndex = 0; lIndex < pxSubscribeArgs->numSubscriptions; lIndex++ )
{
/* This demo doesn't attempt to resubscribe in the event that a SUBACK failed. */
if( pxReturnInfo->pSubackCodes[ lIndex ] == MQTTSubAckFailure )
{
LogError( ( "Failed to resubscribe to topic %.*s.",
pxSubscribeArgs->pSubscribeInfo[ lIndex ].topicFilterLength,
pxSubscribeArgs->pSubscribeInfo[ lIndex ].pTopicFilter ) );
/* Remove subscription callback for unsubscribe. */
removeSubscription( xGlobalSubscriptionList,
pxSubscribeArgs->pSubscribeInfo[ lIndex ].pTopicFilter,
pxSubscribeArgs->pSubscribeInfo[ lIndex ].topicFilterLength );
}
}
/* Hit an assert as some of the tasks won't be able to proceed correctly without
* the subscriptions. This logic will be updated with exponential backoff and retry. */
configASSERT( pdTRUE );
}
}
/*-----------------------------------------------------------*/
static BaseType_t prvSocketConnect( NetworkContext_t * pxNetworkContext )
{
BaseType_t xConnected = pdFAIL;
BackoffAlgorithmStatus_t xBackoffAlgStatus = BackoffAlgorithmSuccess;
BackoffAlgorithmContext_t xReconnectParams = { 0 };
uint16_t usNextRetryBackOff = 0U;
const TickType_t xTransportTimeout = 0UL;
#if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
TlsTransportStatus_t xNetworkStatus = TLS_TRANSPORT_CONNECT_FAILURE;
NetworkCredentials_t xNetworkCredentials = { 0 };
#ifdef democonfigUSE_AWS_IOT_CORE_BROKER
/* ALPN protocols must be a NULL-terminated list of strings. Therefore,
* the first entry will contain the actual ALPN protocol string while the
* second entry must remain NULL. */
char * pcAlpnProtocols[] = { NULL, NULL };
/* The ALPN string changes depending on whether username/password authentication is used. */
#ifdef democonfigCLIENT_USERNAME
pcAlpnProtocols[ 0 ] = AWS_IOT_CUSTOM_AUTH_ALPN;
#else
pcAlpnProtocols[ 0 ] = AWS_IOT_MQTT_ALPN;
#endif
xNetworkCredentials.pAlpnProtos = pcAlpnProtocols;
#endif /* ifdef democonfigUSE_AWS_IOT_CORE_BROKER */
/* Set the credentials for establishing a TLS connection. */
xNetworkCredentials.pRootCa = ( const unsigned char * ) democonfigROOT_CA_PEM;
xNetworkCredentials.rootCaSize = sizeof( democonfigROOT_CA_PEM );
#ifdef democonfigCLIENT_CERTIFICATE_PEM
xNetworkCredentials.pClientCert = ( const unsigned char * ) democonfigCLIENT_CERTIFICATE_PEM;
xNetworkCredentials.clientCertSize = sizeof( democonfigCLIENT_CERTIFICATE_PEM );
xNetworkCredentials.pPrivateKey = ( const unsigned char * ) democonfigCLIENT_PRIVATE_KEY_PEM;
xNetworkCredentials.privateKeySize = sizeof( democonfigCLIENT_PRIVATE_KEY_PEM );
#endif
xNetworkCredentials.disableSni = democonfigDISABLE_SNI;
#else /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */
PlaintextTransportStatus_t xNetworkStatus = PLAINTEXT_TRANSPORT_CONNECT_FAILURE;
#endif /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */
/* We will use a retry mechanism with an exponential backoff mechanism and
* jitter. That is done to prevent a fleet of IoT devices all trying to
* reconnect at exactly the same time should they become disconnected at
* the same time. We initialize reconnect attempts and interval here. */
BackoffAlgorithm_InitializeParams( &xReconnectParams,
RETRY_BACKOFF_BASE_MS,
RETRY_MAX_BACKOFF_DELAY_MS,
RETRY_MAX_ATTEMPTS );
/* Attempt to connect to MQTT broker. If connection fails, retry after a
* timeout. Timeout value will exponentially increase until the maximum
* number of attempts are reached.
*/
do
{
/* Establish a TCP connection with the MQTT broker. This example connects to
* the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and
* democonfigMQTT_BROKER_PORT at the top of this file. */
#if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
LogInfo( ( "Creating a TLS connection to %s:%d.",
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT ) );
xNetworkStatus = TLS_FreeRTOS_Connect( pxNetworkContext,
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT,
&xNetworkCredentials,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS );
xConnected = ( xNetworkStatus == TLS_TRANSPORT_SUCCESS ) ? pdPASS : pdFAIL;
#else /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */
LogInfo( ( "Creating a TCP connection to %s:%d.",
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT ) );
xNetworkStatus = Plaintext_FreeRTOS_Connect( pxNetworkContext,
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS );
xConnected = ( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS ) ? pdPASS : pdFAIL;
#endif /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */
if( !xConnected )
{
/* Get back-off value (in milliseconds) for the next connection retry. */
xBackoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &xReconnectParams, uxRand(), &usNextRetryBackOff );
if( xBackoffAlgStatus == BackoffAlgorithmSuccess )
{
LogWarn( ( "Connection to the broker failed. "
"Retrying connection in %hu ms.",
usNextRetryBackOff ) );
vTaskDelay( pdMS_TO_TICKS( usNextRetryBackOff ) );
}
}
if( xBackoffAlgStatus == BackoffAlgorithmRetriesExhausted )
{
LogError( ( "Connection to the broker failed, all attempts exhausted." ) );
}
} while( ( xConnected != pdPASS ) && ( xBackoffAlgStatus == BackoffAlgorithmSuccess ) );
/* Set the socket wakeup callback and ensure the read block time. */
if( xConnected )
{
( void ) FreeRTOS_setsockopt( pxNetworkContext->pParams->tcpSocket,
0, /* Level - Unused. */
FREERTOS_SO_WAKEUP_CALLBACK,
( void * ) prvMQTTClientSocketWakeupCallback,
sizeof( &( prvMQTTClientSocketWakeupCallback ) ) );
( void ) FreeRTOS_setsockopt( pxNetworkContext->pParams->tcpSocket,
0,
FREERTOS_SO_RCVTIMEO,
&xTransportTimeout,
sizeof( TickType_t ) );
}
return xConnected;
}
/*-----------------------------------------------------------*/
static BaseType_t prvSocketDisconnect( NetworkContext_t * pxNetworkContext )
{
BaseType_t xDisconnected = pdFAIL;
/* Set the wakeup callback to NULL since the socket will disconnect. */
( void ) FreeRTOS_setsockopt( pxNetworkContext->pParams->tcpSocket,
0, /* Level - Unused. */
FREERTOS_SO_WAKEUP_CALLBACK,
( void * ) NULL,
sizeof( void * ) );
#if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
LogInfo( ( "Disconnecting TLS connection.\n" ) );
TLS_FreeRTOS_Disconnect( pxNetworkContext );
xDisconnected = pdPASS;
#else
LogInfo( ( "Disconnecting TCP connection.\n" ) );
PlaintextTransportStatus_t xNetworkStatus = PLAINTEXT_TRANSPORT_CONNECT_FAILURE;
xNetworkStatus = Plaintext_FreeRTOS_Disconnect( pxNetworkContext );
xDisconnected = ( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS ) ? pdPASS : pdFAIL;
#endif
return xDisconnected;
}
/*-----------------------------------------------------------*/
static void prvMQTTClientSocketWakeupCallback( Socket_t pxSocket )
{
CommandInfo_t xCommandParams = { 0 };
/* Just to avoid compiler warnings. The socket is not used but the function
* prototype cannot be changed because this is a callback function. */
( void ) pxSocket;
/* A socket used by the MQTT task may need attention. Send an event
* to the MQTT task to make sure the task is not blocked on xCommandQueue. */
if( ( uxQueueMessagesWaiting( xCommandQueue.queue ) == 0U ) && ( FreeRTOS_recvcount( pxSocket ) > 0 ) )
{
/* Don't block as this is called from the context of the IP task. */
xCommandParams.blockTimeMs = 0U;
MQTTAgent_ProcessLoop( &xGlobalMqttAgentContext, &xCommandParams );
}
}
/*-----------------------------------------------------------*/
static void prvIncomingPublishCallback( MQTTAgentContext_t * pMqttAgentContext,
uint16_t packetId,
MQTTPublishInfo_t * pxPublishInfo )
{
bool xPublishHandled = false;
char cOriginalChar, * pcLocation;
( void ) packetId;
/* Fan out the incoming publishes to the callbacks registered using
* subscription manager. */
xPublishHandled = handleIncomingPublishes( ( SubscriptionElement_t * ) pMqttAgentContext->pIncomingCallbackContext,
pxPublishInfo );
/* If there are no callbacks to handle the incoming publishes,
* handle it as an unsolicited publish. */
if( xPublishHandled != true )
{
/* Ensure the topic string is terminated for printing. This will over-
* write the message ID, which is restored afterwards. */
pcLocation = ( char * ) &( pxPublishInfo->pTopicName[ pxPublishInfo->topicNameLength ] );
cOriginalChar = *pcLocation;
*pcLocation = 0x00;
LogWarn( ( "WARN: Received an unsolicited publish from topic %s", pxPublishInfo->pTopicName ) );
*pcLocation = cOriginalChar;
}
}
/*-----------------------------------------------------------*/
static void prvMQTTAgentTask( void * pvParameters )
{
BaseType_t xNetworkResult = pdFAIL;
MQTTStatus_t xMQTTStatus = MQTTSuccess, xConnectStatus = MQTTSuccess;
MQTTContext_t * pMqttContext = &( xGlobalMqttAgentContext.mqttContext );
( void ) pvParameters;
do
{
/* MQTTAgent_CommandLoop() is effectively the agent implementation. It
* will manage the MQTT protocol until such time that an error occurs,
* which could be a disconnect. If an error occurs the MQTT context on
* which the error happened is returned so there can be an attempt to
* clean up and reconnect however the application writer prefers. */
xMQTTStatus = MQTTAgent_CommandLoop( &xGlobalMqttAgentContext );
/* Success is returned for disconnect or termination. The socket should
* be disconnected. */
if( ( xMQTTStatus == MQTTSuccess ) && ( xGlobalMqttAgentContext.mqttContext.connectStatus == MQTTNotConnected ) )
{
/* MQTT Disconnect. Disconnect the socket. */
xNetworkResult = prvSocketDisconnect( &xNetworkContext );
configASSERT( xNetworkResult == pdPASS );
}
else if( xMQTTStatus == MQTTSuccess )
{
/* MQTTAgent_Terminate() was called, but MQTT was not disconnected. */
xMQTTStatus = MQTT_Disconnect( &( xGlobalMqttAgentContext.mqttContext ) );
configASSERT( xMQTTStatus == MQTTSuccess );
xNetworkResult = prvSocketDisconnect( &xNetworkContext );
configASSERT( xNetworkResult == pdPASS );
}
/* Error. */
else
{
/* Reconnect TCP. */
xNetworkResult = prvSocketDisconnect( &xNetworkContext );
configASSERT( xNetworkResult == pdPASS );
xNetworkResult = prvSocketConnect( &xNetworkContext );
configASSERT( xNetworkResult == pdPASS );
pMqttContext->connectStatus = MQTTNotConnected;
/* MQTT Connect with a persistent session. */
xConnectStatus = prvMQTTConnect( false );
configASSERT( xConnectStatus == MQTTSuccess );
}
} while( xMQTTStatus != MQTTSuccess );
}
/*-----------------------------------------------------------*/
static void prvConnectToMQTTBroker( void )
{
BaseType_t xNetworkStatus = pdFAIL;
MQTTStatus_t xMQTTStatus;
/* Connect a TCP socket to the broker. */
xNetworkStatus = prvSocketConnect( &xNetworkContext );
configASSERT( xNetworkStatus == pdPASS );
/* Initialize the MQTT context with the buffer and transport interface. */
xMQTTStatus = prvMQTTInit();
configASSERT( xMQTTStatus == MQTTSuccess );
/* Form an MQTT connection without a persistent session. */
xMQTTStatus = prvMQTTConnect( true );
configASSERT( xMQTTStatus == MQTTSuccess );
}
/*-----------------------------------------------------------*/
static void prvConnectAndCreateDemoTasks( void * pvParameters )
{
( void ) pvParameters;
/* Miscellaneous initialization. */
ulGlobalEntryTimeMs = prvGetTimeMs();
/* Set the pParams member of the network context with desired transport. */
#if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
xNetworkContext.pParams = &xTlsTransportParams;
#else
xNetworkContext.pParams = &xPlaintextTransportParams;
#endif
/* Create the TCP connection to the broker, then the MQTT connection to the
* same. */
prvConnectToMQTTBroker();
/* Selectively create demo tasks as per the compile time constant settings. */
#if ( democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE > 0 )
{
vStartSimpleSubscribePublishTask( democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE,
democonfigSIMPLE_SUB_PUB_TASK_STACK_SIZE,
tskIDLE_PRIORITY );
}
#endif
/* This task has nothing left to do, so rather than create the MQTT
* agent as a separate thread, it simply calls the function that implements
* the agent - in effect turning itself into the agent. */
prvMQTTAgentTask( NULL );
/* Should not get here. Force an assert if the task returns from
* prvMQTTAgentTask(). */
configASSERT( pvParameters == ( void * ) ~1 );
}
/*-----------------------------------------------------------*/
static uint32_t prvGetTimeMs( void )
{
TickType_t xTickCount = 0;
uint32_t ulTimeMs = 0UL;
/* Get the current tick count. */
xTickCount = xTaskGetTickCount();
/* Convert the ticks to milliseconds. */
ulTimeMs = ( uint32_t ) xTickCount * mqttexampleMILLISECONDS_PER_TICK;
/* Reduce ulGlobalEntryTimeMs from obtained time so as to always return the
* elapsed time in the application. */
ulTimeMs = ( uint32_t ) ( ulTimeMs - ulGlobalEntryTimeMs );
return ulTimeMs;
}
/*-----------------------------------------------------------*/

@ -0,0 +1,528 @@
/*
* FreeRTOS V202012.00
* Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* https://www.FreeRTOS.org
* https://github.com/FreeRTOS
*
*/
/*
* This file demonstrates numerous tasks all of which use the MQTT agent API
* to send unique MQTT payloads to unique topics over the same MQTT connection
* to the same MQTT agent. Some tasks use QoS0 and others QoS1.
*
* Each created task is a unique instance of the task implemented by
* prvSimpleSubscribePublishTask(). prvSimpleSubscribePublishTask()
* subscribes to a topic then periodically publishes a message to the same
* topic to which it has subscribed. The command context sent to
* MQTTAgent_Publish() contains a unique number that is sent back to the task
* as a task notification from the callback function that executes when the
* PUBLISH operation is acknowledged (or just sent in the case of QoS 0). The
* task checks the number it receives from the callback equals the number it
* previously set in the command context before printing out either a success
* or failure message.
*/
/* Standard includes. */
#include <string.h>
#include <stdio.h>
#include <assert.h>
/* Kernel includes. */
#include "FreeRTOS.h"
#include "task.h"
#include "queue.h"
/* Demo Specific configs. */
#include "demo_config.h"
/* MQTT library includes. */
#include "core_mqtt.h"
/* MQTT agent include. */
#include "mqtt_agent.h"
/* Subscription manager header include. */
#include "subscription_manager.h"
/**
* @brief This demo uses task notifications to signal tasks from MQTT callback
* functions. mqttexampleMS_TO_WAIT_FOR_NOTIFICATION defines the time, in ticks,
* to wait for such a callback.
*/
#define mqttexampleMS_TO_WAIT_FOR_NOTIFICATION ( 10000 )
/**
* @brief Size of statically allocated buffers for holding topic names and
* payloads.
*/
#define mqttexampleSTRING_BUFFER_LENGTH ( 100 )
/**
* @brief Delay for each task between publishes.
*/
#define mqttexampleDELAY_BETWEEN_PUBLISH_OPERATIONS_MS ( 1000U )
/**
* @brief Number of publishes done by each task in this demo.
*/
#define mqttexamplePUBLISH_COUNT ( 0xffffffffUL )
/**
* @brief The maximum amount of time in milliseconds to wait for the commands
* to be posted to the MQTT agent should the MQTT agent's command queue be full.
* Tasks wait in the Blocked state, so don't use any CPU time.
*/
#define mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS ( 500 )
/*-----------------------------------------------------------*/
/**
* @brief Defines the structure to use as the command callback context in this
* demo.
*/
struct CommandContext
{
MQTTStatus_t xReturnStatus;
TaskHandle_t xTaskToNotify;
uint32_t ulNotificationValue;
void * pArgs;
};
/*-----------------------------------------------------------*/
/**
* @brief Passed into MQTTAgent_Subscribe() as the callback to execute when the
* broker ACKs the SUBSCRIBE message. Its implementation sends a notification
* to the task that called MQTTAgent_Subscribe() to let the task know the
* SUBSCRIBE operation completed. It also sets the xReturnStatus of the
* structure passed in as the command's context to the value of the
* xReturnStatus parameter - which enables the task to check the status of the
* operation.
*
* See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call
*
* @param[in] pxCommandContext Context of the initial command.
* @param[in].xReturnStatus The result of the command.
*/
static void prvSubscribeCommandCallback( void * pxCommandContext,
MQTTAgentReturnInfo_t * pxReturnInfo );
/**
* @brief Passed into MQTTAgent_Publish() as the callback to execute when the
* broker ACKs the PUBLISH message. Its implementation sends a notification
* to the task that called MQTTAgent_Publish() to let the task know the
* PUBLISH operation completed. It also sets the xReturnStatus of the
* structure passed in as the command's context to the value of the
* xReturnStatus parameter - which enables the task to check the status of the
* operation.
*
* See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call
*
* @param[in] pxCommandContext Context of the initial command.
* @param[in].xReturnStatus The result of the command.
*/
static void prvPublishCommandCallback( CommandContext_t * pxCommandContext,
MQTTAgentReturnInfo_t * pxReturnInfo );
/**
* @brief Called by the task to wait for a notification from a callback function
* after the task first executes either MQTTAgent_Publish()* or
* MQTTAgent_Subscribe().
*
* See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call
*
* @param[in] pxCommandContext Context of the initial command.
* @param[out] pulNotifiedValue The task's notification value after it receives
* a notification from the callback.
*
* @return pdTRUE if the task received a notification, otherwise pdFALSE.
*/
static BaseType_t prvWaitForCommandAcknowledgment( uint32_t * pulNotifiedValue );
/**
* @brief Passed into MQTTAgent_Subscribe() as the callback to execute when
* there is an incoming publish on the topic being subscribed to. Its
* implementation just logs information about the incoming publish including
* the publish messages source topic and payload.
*
* See https://freertos.org/mqtt/mqtt-agent-demo.html#example_mqtt_api_call
*
* @param[in] pvIncomingPublishCallbackContext Context of the initial command.
* @param[in] pxPublishInfo Deserialized publish.
*/
static void prvIncomingPublishCallback( void * pvIncomingPublishCallbackContext,
MQTTPublishInfo_t * pxPublishInfo );
/**
* @brief Subscribe to the topic the demo task will also publish to - that
* results in all outgoing publishes being published back to the task
* (effectively echoed back).
*
* @param[in] xQoS The quality of service (QoS) to use. Can be zero or one
* for all MQTT brokers. Can also be QoS2 if supported by the broker. AWS IoT
* does not support QoS2.
*/
static bool prvSubscribeToTopic( MQTTQoS_t xQoS,
char * pcTopicFilter );
/**
* @brief The function that implements the task demonstrated by this file.
*/
static void prvSimpleSubscribePublishTask( void * pvParameters );
/*-----------------------------------------------------------*/
/**
* @brief The MQTT agent manages the MQTT contexts. This set the handle to the
* context used by this demo.
*/
extern MQTTAgentContext_t xGlobalMqttAgentContext;
/*-----------------------------------------------------------*/
/**
* @brief The buffer to hold the topic filter. The topic is generated at runtime
* by adding the task names.
*
* @note The topic strings must persist until unsubscribed.
*/
static char topicBuf[ democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE ][ mqttexampleSTRING_BUFFER_LENGTH ];
/*-----------------------------------------------------------*/
void vStartSimpleSubscribePublishTask( uint32_t ulNumberToCreate,
configSTACK_DEPTH_TYPE uxStackSize,
UBaseType_t uxPriority )
{
char pcTaskNameBuf[ 15 ];
uint32_t ulTaskNumber;
/* Each instance of prvSimpleSubscribePublishTask() generates a unique name
* and topic filter for itself from the number passed in as the task
* parameter. */
/* Create a few instances of vSimpleSubscribePublishTask(). */
for( ulTaskNumber = 0; ulTaskNumber < ulNumberToCreate; ulTaskNumber++ )
{
memset( pcTaskNameBuf, 0x00, sizeof( pcTaskNameBuf ) );
snprintf( pcTaskNameBuf, 10, "SubPub%d", ( int ) ulTaskNumber );
xTaskCreate( prvSimpleSubscribePublishTask,
pcTaskNameBuf,
uxStackSize,
( void * ) ulTaskNumber,
uxPriority,
NULL );
}
}
/*-----------------------------------------------------------*/
static void prvPublishCommandCallback( CommandContext_t * pxCommandContext,
MQTTAgentReturnInfo_t * pxReturnInfo )
{
/* Store the result in the application defined context so the task that
* initiated the publish can check the operation's status. */
pxCommandContext->xReturnStatus = pxReturnInfo->returnCode;
if( pxCommandContext->xTaskToNotify != NULL )
{
/* Send the context's ulNotificationValue as the notification value so
* the receiving task can check the value it set in the context matches
* the value it receives in the notification. */
xTaskNotify( pxCommandContext->xTaskToNotify,
pxCommandContext->ulNotificationValue,
eSetValueWithOverwrite );
}
}
/*-----------------------------------------------------------*/
static void prvSubscribeCommandCallback( void * pxCommandContext,
MQTTAgentReturnInfo_t * pxReturnInfo )
{
bool xSubscriptionAdded = false;
CommandContext_t * pxApplicationDefinedContext = ( CommandContext_t * ) pxCommandContext;
MQTTAgentSubscribeArgs_t * pxSubscribeArgs = ( MQTTAgentSubscribeArgs_t * ) pxApplicationDefinedContext->pArgs;
/* Store the result in the application defined context so the task that
* initiated the subscribe can check the operation's status. Also send the
* status as the notification value. These things are just done for
* demonstration purposes. */
pxApplicationDefinedContext->xReturnStatus = pxReturnInfo->returnCode;
/* Check if the subscribe operation is a success. Only one topic is
* subscribed by this demo. */
if( pxReturnInfo->returnCode == MQTTSuccess )
{
/* Add subscription so that incoming publishes are routed to the application
* callback. */
xSubscriptionAdded = addSubscription( ( SubscriptionElement_t * ) xGlobalMqttAgentContext.pIncomingCallbackContext,
pxSubscribeArgs->pSubscribeInfo->pTopicFilter,
pxSubscribeArgs->pSubscribeInfo->topicFilterLength,
prvIncomingPublishCallback,
NULL );
if( xSubscriptionAdded == false )
{
LogError( ( "Failed to register an incoming publish callback for topic %.*s.",
pxSubscribeArgs->pSubscribeInfo->topicFilterLength,
pxSubscribeArgs->pSubscribeInfo->pTopicFilter ) );
}
}
xTaskNotify( pxApplicationDefinedContext->xTaskToNotify,
( uint32_t ) ( pxReturnInfo->returnCode ),
eSetValueWithOverwrite );
}
/*-----------------------------------------------------------*/
static BaseType_t prvWaitForCommandAcknowledgment( uint32_t * pulNotifiedValue )
{
BaseType_t xReturn;
/* Wait for this task to get notified, passing out the value it gets
* notified with. */
xReturn = xTaskNotifyWait( 0,
0,
pulNotifiedValue,
pdMS_TO_TICKS( mqttexampleMS_TO_WAIT_FOR_NOTIFICATION ) );
return xReturn;
}
/*-----------------------------------------------------------*/
static void prvIncomingPublishCallback( void * pvIncomingPublishCallbackContext,
MQTTPublishInfo_t * pxPublishInfo )
{
static char cTerminatedString[ mqttexampleSTRING_BUFFER_LENGTH ];
( void ) pvIncomingPublishCallbackContext;
/* Create a message that contains the incoming MQTT payload to the logger,
* terminating the string first. */
if( pxPublishInfo->payloadLength < mqttexampleSTRING_BUFFER_LENGTH )
{
memcpy( ( void * ) cTerminatedString, pxPublishInfo->pPayload, pxPublishInfo->payloadLength );
cTerminatedString[ pxPublishInfo->payloadLength ] = 0x00;
}
else
{
memcpy( ( void * ) cTerminatedString, pxPublishInfo->pPayload, mqttexampleSTRING_BUFFER_LENGTH );
cTerminatedString[ mqttexampleSTRING_BUFFER_LENGTH - 1 ] = 0x00;
}
LogInfo( ( "Received incoming publish message %s", cTerminatedString ) );
}
/*-----------------------------------------------------------*/
static bool prvSubscribeToTopic( MQTTQoS_t xQoS,
char * pcTopicFilter )
{
MQTTStatus_t xCommandAdded;
BaseType_t xCommandAcknowledged = pdFALSE;
uint32_t ulSubscribeMessageID;
MQTTAgentSubscribeArgs_t xSubscribeArgs;
MQTTSubscribeInfo_t xSubscribeInfo;
static int32_t ulNextSubscribeMessageID = 0;
CommandContext_t xApplicationDefinedContext = { 0 };
CommandInfo_t xCommandParams = { 0 };
/* Create a unique number of the subscribe that is about to be sent. The number
* is used as the command context and is sent back to this task as a notification
* in the callback that executed upon receipt of the subscription acknowledgment.
* That way this task can match an acknowledgment to a subscription. */
xTaskNotifyStateClear( NULL );
taskENTER_CRITICAL();
{
ulNextSubscribeMessageID++;
ulSubscribeMessageID = ulNextSubscribeMessageID;
}
taskEXIT_CRITICAL();
/* Complete the subscribe information. The topic string must persist for
* duration of subscription! */
xSubscribeInfo.pTopicFilter = pcTopicFilter;
xSubscribeInfo.topicFilterLength = ( uint16_t ) strlen( pcTopicFilter );
xSubscribeInfo.qos = xQoS;
xSubscribeArgs.pSubscribeInfo = &xSubscribeInfo;
xSubscribeArgs.numSubscriptions = 1;
/* Complete an application defined context associated with this subscribe message.
* This gets updated in the callback function so the variable must persist until
* the callback executes. */
xApplicationDefinedContext.ulNotificationValue = ulNextSubscribeMessageID;
xApplicationDefinedContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
xApplicationDefinedContext.pArgs = ( void * ) &xSubscribeArgs;
xCommandParams.blockTimeMs = mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS;
xCommandParams.cmdCompleteCallback = prvSubscribeCommandCallback;
xCommandParams.pCmdCompleteCallbackContext = ( void * ) &xApplicationDefinedContext;
/* Loop in case the queue used to communicate with the MQTT agent is full and
* attempts to post to it time out. The queue will not become full if the
* priority of the MQTT agent task is higher than the priority of the task
* calling this function. */
LogInfo( ( "Sending subscribe request to agent for topic filter: %s with id %d",
pcTopicFilter,
( int ) ulSubscribeMessageID ) );
do
{
/* TODO: prvIncomingPublish as publish callback. */
xCommandAdded = MQTTAgent_Subscribe( &xGlobalMqttAgentContext,
&xSubscribeArgs,
&xCommandParams );
} while( xCommandAdded != MQTTSuccess );
/* Wait for acks to the subscribe message - this is optional but done here
* so the code below can check the notification sent by the callback matches
* the ulNextSubscribeMessageID value set in the context above. */
xCommandAcknowledged = prvWaitForCommandAcknowledgment( NULL );
/* Check both ways the status was passed back just for demonstration
* purposes. */
if( ( xCommandAcknowledged != pdTRUE ) ||
( xApplicationDefinedContext.xReturnStatus != MQTTSuccess ) )
{
LogInfo( ( "Error or timed out waiting for ack to subscribe message topic %s",
pcTopicFilter ) );
}
else
{
LogInfo( ( "Received subscribe ack for topic %s containing ID %d",
pcTopicFilter,
( int ) xApplicationDefinedContext.ulNotificationValue ) );
}
return xCommandAcknowledged;
}
/*-----------------------------------------------------------*/
static void prvSimpleSubscribePublishTask( void * pvParameters )
{
extern UBaseType_t uxRand( void );
MQTTPublishInfo_t xPublishInfo = { 0 };
char payloadBuf[ mqttexampleSTRING_BUFFER_LENGTH ];
char taskName[ mqttexampleSTRING_BUFFER_LENGTH ];
CommandContext_t xCommandContext;
uint32_t ulNotification = 0U, ulValueToNotify = 0UL;
MQTTStatus_t xCommandAdded;
uint32_t ulTaskNumber = ( uint32_t ) pvParameters;
MQTTQoS_t xQoS;
TickType_t xTicksToDelay;
CommandInfo_t xCommandParams = { 0 };
char * pcTopicBuffer = topicBuf[ ulTaskNumber ];
/* Have different tasks use different QoS. 0 and 1. 2 can also be used
* if supported by the broker. */
xQoS = ( MQTTQoS_t ) ( ulTaskNumber % 2UL );
/* Create a unique name for this task from the task number that is passed into
* the task using the task's parameter. */
snprintf( taskName, mqttexampleSTRING_BUFFER_LENGTH, "Publisher%d", ( int ) ulTaskNumber );
/* Create a topic name for this task to publish to. */
snprintf( pcTopicBuffer, mqttexampleSTRING_BUFFER_LENGTH, "/filter/%s", taskName );
/* Subscribe to the same topic to which this task will publish. That will
* result in each published message being published from the server back to
* the target. */
prvSubscribeToTopic( xQoS, pcTopicBuffer );
/* Configure the publish operation. */
memset( ( void * ) &xPublishInfo, 0x00, sizeof( xPublishInfo ) );
xPublishInfo.qos = xQoS;
xPublishInfo.pTopicName = pcTopicBuffer;
xPublishInfo.topicNameLength = ( uint16_t ) strlen( pcTopicBuffer );
xPublishInfo.pPayload = payloadBuf;
/* Store the handler to this task in the command context so the callback
* that executes when the command is acknowledged can send a notification
* back to this task. */
memset( ( void * ) &xCommandContext, 0x00, sizeof( xCommandContext ) );
xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
xCommandParams.blockTimeMs = mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS;
xCommandParams.cmdCompleteCallback = prvPublishCommandCallback;
xCommandParams.pCmdCompleteCallbackContext = &xCommandContext;
/* For a finite number of publishes... */
for( ulValueToNotify = 0UL; ulValueToNotify < mqttexamplePUBLISH_COUNT; ulValueToNotify++ )
{
/* Create a payload to send with the publish message. This contains
* the task name and an incrementing number. */
snprintf( payloadBuf,
mqttexampleSTRING_BUFFER_LENGTH,
"%s publishing message %d",
taskName,
( int ) ulValueToNotify );
xPublishInfo.payloadLength = ( uint16_t ) strlen( payloadBuf );
/* Also store the incrementing number in the command context so it can
* be accessed by the callback that executes when the publish operation
* is acknowledged. */
xCommandContext.ulNotificationValue = ulValueToNotify;
LogInfo( ( "Sending publish request to agent with message \"%s\" on topic \"%s\"",
payloadBuf,
pcTopicBuffer ) );
/* To ensure ulNotification doesn't accidentally hold the expected value
* as it is to be checked against the value sent from the callback.. */
ulNotification = ~ulValueToNotify;
xCommandAdded = MQTTAgent_Publish( &xGlobalMqttAgentContext,
&xPublishInfo,
&xCommandParams );
configASSERT( xCommandAdded == MQTTSuccess );
/* For QoS 1 and 2, wait for the publish acknowledgment. For QoS0,
* wait for the publish to be sent. */
LogInfo( ( "Task %s waiting for publish %d to complete.",
taskName,
ulValueToNotify ) );
prvWaitForCommandAcknowledgment( &ulNotification );
/* The value received by the callback that executed when the publish was
* completed came from the context passed into MQTTAgent_Publish() above,
* so should match the value set in the context above. */
configASSERT( ulNotification == ulValueToNotify );
/* Log statement to indicate successful reception of publish. */
LogInfo( ( "Demo completed successfully.\r\n" ) );
LogInfo( ( "Short delay before next publish... \r\n\r\n" ) );
/* Add a little randomness into the delay so the tasks don't remain
* in lockstep. */
xTicksToDelay = pdMS_TO_TICKS( mqttexampleDELAY_BETWEEN_PUBLISH_OPERATIONS_MS ) +
( uxRand() % 0xff );
vTaskDelay( xTicksToDelay );
}
/* Delete the task if it is complete. */
LogInfo( ( "Task %s completed.", taskName ) );
vTaskDelete( NULL );
}

@ -58,7 +58,7 @@
</Midl>
<ClCompile>
<Optimization>Disabled</Optimization>
<AdditionalIncludeDirectories>..\..\..\..\Source\FreeRTOS-Plus-Trace\Include;..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\include;..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\portable\BufferManagement;..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\portable\Compiler\MSVC;..\..\..\..\FreeRTOS-Plus\Source\Utilities\logging;..\common\WinPCap;..\..\..\..\FreeRTOS\Source\include;..\..\..\..\FreeRTOS\Source\portable\MSVC-MingW;..\..\..\Source\Application-Protocols\coreMQTT\source\include;..\..\..\Source\Application-Protocols\coreMQTT\source\interface;..\..\..\Source\Utilities\backoff_algorithm\source\include;..\..\..\Source\Application-Protocols\network_transport\freertos_plus_tcp;..\..\..\Source\Application-Protocols\network_transport\freertos_plus_tcp\using_plaintext;..\..\..\Source\Application-Protocols\network_transport\freertos_plus_tcp\using_mbedtls;..\..\..\Source\Utilities\mbedtls_freertos;..\..\..\..\Source\mbedtls_utils;..\..\..\ThirdParty\mbedtls\include;.;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories>..\..\..\..\Source\FreeRTOS-Plus-Trace\Include;..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\include;..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\portable\BufferManagement;..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\portable\Compiler\MSVC;..\..\..\..\FreeRTOS-Plus\Source\Utilities\logging;..\common\WinPCap;..\..\..\..\FreeRTOS\Source\include;..\..\..\..\FreeRTOS\Source\portable\MSVC-MingW;..\..\..\Source\Application-Protocols\coreMQTT\source\include;..\..\..\Source\Application-Protocols\coreMQTT\source\interface;..\..\..\Source\Utilities\backoff_algorithm\source\include;..\..\..\Source\Application-Protocols\network_transport\freertos_plus_tcp;..\..\..\Source\Application-Protocols\network_transport\freertos_plus_tcp\using_plaintext;..\..\..\Source\Application-Protocols\network_transport\freertos_plus_tcp\using_mbedtls;..\..\..\Source\Utilities\mbedtls_freertos;..\..\..\..\Source\mbedtls_utils;..\..\..\ThirdParty\mbedtls\include;..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\include;..\..\Common\coreMQTT_Agent_Interface\include/;subscription-manager;.;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<PreprocessorDefinitions>MBEDTLS_CONFIG_FILE="mbedtls_config.h";WIN32;_DEBUG;_CONSOLE;_WIN32_WINNT=0x0500;WINVER=0x400;_CRT_SECURE_NO_WARNINGS;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<MinimalRebuild>false</MinimalRebuild>
<BasicRuntimeChecks>EnableFastChecks</BasicRuntimeChecks>
@ -157,6 +157,8 @@
<ClCompile Include="..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\FreeRTOS_UDP_IP.c" />
<ClCompile Include="..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\portable\BufferManagement\BufferAllocation_2.c" />
<ClCompile Include="..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\portable\NetworkInterface\WinPCap\NetworkInterface.c" />
<ClCompile Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\mqtt_agent.c" />
<ClCompile Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\mqtt_agent_command_functions.c" />
<ClCompile Include="..\..\..\Source\Utilities\mbedtls_freertos\mbedtls_freertos_port.c" />
<ClCompile Include="..\..\..\Source\Utilities\mbedtls_freertos\mbedtls_error.c" />
<ClCompile Include="..\..\..\Source\Utilities\backoff_algorithm\source\backoff_algorithm.c" />
@ -483,8 +485,12 @@
<WarningLevel Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">TurnOffAllWarnings</WarningLevel>
</ClCompile>
<ClCompile Include="..\..\..\..\FreeRTOS-Plus\Demo\Common\Logging\windows\Logging_WinSim.c" />
<ClCompile Include="..\..\Common\coreMQTT_Agent_Interface\freertos_agent_message.c" />
<ClCompile Include="..\..\Common\coreMQTT_Agent_Interface\freertos_command_pool.c" />
<ClCompile Include="..\Common\main.c" />
<ClCompile Include="DemoTasks\MultitaskMQTTExample.c" />
<ClCompile Include="DemoTasks\mqtt-agent-task.c" />
<ClCompile Include="DemoTasks\simple_sub_pub_demo.c" />
<ClCompile Include="subscription-manager\subscription_manager.c" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\..\..\FreeRTOS\Source\include\event_groups.h" />
@ -513,6 +519,9 @@
<ClInclude Include="..\..\..\..\FreeRTOS-Plus\Source\Utilities\logging\logging.h" />
<ClInclude Include="..\..\..\..\FreeRTOS-Plus\Source\Utilities\logging\logging_levels.h" />
<ClInclude Include="..\..\..\..\FreeRTOS-Plus\Source\Utilities\logging\logging_stack.h" />
<ClInclude Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\include\agent_message.h" />
<ClInclude Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\include\mqtt_agent.h" />
<ClInclude Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\include\mqtt_agent_command_functions.h" />
<ClInclude Include="..\..\..\Source\Application-Protocols\coreMQTT\source\include\core_mqtt_config_defaults.h" />
<ClInclude Include="..\..\..\Source\FreeRTOS-Plus-TCP\include\FreeRTOS_errno_TCP.h" />
<ClInclude Include="..\..\..\Source\Utilities\mbedtls_freertos\mbedtls_error.h" />
@ -604,13 +613,16 @@
<ClInclude Include="..\..\..\ThirdParty\mbedtls\include\mbedtls\x509_crt.h" />
<ClInclude Include="..\..\..\ThirdParty\mbedtls\include\mbedtls\x509_csr.h" />
<ClInclude Include="..\..\..\ThirdParty\mbedtls\include\mbedtls\xtea.h" />
<ClInclude Include="..\..\Common\coreMQTT_Agent_Interface\include\freertos_agent_message.h" />
<ClInclude Include="..\..\Common\coreMQTT_Agent_Interface\include\freertos_command_pool.h" />
<ClInclude Include="mbedtls_config.h" />
<ClInclude Include="demo_config.h" />
<ClInclude Include="FreeRTOSConfig.h" />
<ClInclude Include="FreeRTOSIPConfig.h" />
<ClInclude Include="core_mqtt_config.h" />
<ClInclude Include="subscription-manager\subscription_manager.h" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>
</Project>

@ -77,6 +77,21 @@
<Filter Include="Config">
<UniqueIdentifier>{0e25b4a0-53aa-4abd-a4bc-b3658c9c3bc9}</UniqueIdentifier>
</Filter>
<Filter Include="FreeRTOS+\FreeRTOS IoT Libraries\standard\coreMQTT-Agent">
<UniqueIdentifier>{a081aee5-1e00-443e-98b5-ca297fc5b4ac}</UniqueIdentifier>
</Filter>
<Filter Include="FreeRTOS+\FreeRTOS IoT Libraries\standard\coreMQTT-Agent\include">
<UniqueIdentifier>{01a3e0d6-b836-4f1d-8d9d-e83c247a50c7}</UniqueIdentifier>
</Filter>
<Filter Include="FreeRTOS+\FreeRTOS IoT Libraries\platform\mqtt-agent-interface">
<UniqueIdentifier>{7113221b-1ca6-4504-9993-75664336ef8a}</UniqueIdentifier>
</Filter>
<Filter Include="FreeRTOS+\FreeRTOS IoT Libraries\platform\mqtt-agent-interface\include">
<UniqueIdentifier>{d6d1e5f0-f23c-47d1-ba70-3dbc26acf362}</UniqueIdentifier>
</Filter>
<Filter Include="subscription-manager">
<UniqueIdentifier>{775bbd2e-ac31-4bb4-9de2-468c05269332}</UniqueIdentifier>
</Filter>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\..\..\..\FreeRTOS\Source\portable\MSVC-MingW\port.c">
@ -375,7 +390,7 @@
</ClCompile>
<ClCompile Include="..\..\..\..\FreeRTOS-Plus\Demo\Common\Logging\windows\Logging_WinSim.c" />
<ClCompile Include="..\Common\main.c" />
<ClCompile Include="DemoTasks\MultitaskMQTTExample.c">
<ClCompile Include="DemoTasks\mqtt-agent-task.c">
<Filter>DemoTasks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\Source\Utilities\mbedtls_freertos\mbedtls_freertos_port.c">
@ -405,6 +420,24 @@
<ClCompile Include="..\..\..\Source\Application-Protocols\coreMQTT\source\core_mqtt_serializer.c">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\coreMQTT</Filter>
</ClCompile>
<ClCompile Include="DemoTasks\simple_sub_pub_demo.c">
<Filter>DemoTasks</Filter>
</ClCompile>
<ClCompile Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\mqtt_agent.c">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\coreMQTT-Agent</Filter>
</ClCompile>
<ClCompile Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\mqtt_agent_command_functions.c">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\coreMQTT-Agent</Filter>
</ClCompile>
<ClCompile Include="..\..\Common\coreMQTT_Agent_Interface\freertos_agent_message.c">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\platform\mqtt-agent-interface</Filter>
</ClCompile>
<ClCompile Include="..\..\Common\coreMQTT_Agent_Interface\freertos_command_pool.c">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\platform\mqtt-agent-interface</Filter>
</ClCompile>
<ClCompile Include="subscription-manager\subscription_manager.c">
<Filter>subscription-manager</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\..\..\FreeRTOS-Plus\Source\FreeRTOS-Plus-TCP\include\NetworkInterface.h">
@ -773,5 +806,23 @@
<ClInclude Include="..\..\..\Source\Utilities\mbedtls_freertos\mbedtls_error.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\platform\mbedtls</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\include\agent_message.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\coreMQTT-Agent\include</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\include\mqtt_agent.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\coreMQTT-Agent\include</Filter>
</ClInclude>
<ClInclude Include="..\..\..\Source\Application-Protocols\coreMQTT-Agent\source\include\mqtt_agent_command_functions.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\standard\coreMQTT-Agent\include</Filter>
</ClInclude>
<ClInclude Include="..\..\Common\coreMQTT_Agent_Interface\include\freertos_agent_message.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\platform\mqtt-agent-interface\include</Filter>
</ClInclude>
<ClInclude Include="..\..\Common\coreMQTT_Agent_Interface\include\freertos_command_pool.h">
<Filter>FreeRTOS+\FreeRTOS IoT Libraries\platform\mqtt-agent-interface\include</Filter>
</ClInclude>
<ClInclude Include="subscription-manager\subscription_manager.h">
<Filter>subscription-manager</Filter>
</ClInclude>
</ItemGroup>
</Project>
</Project>

@ -19,10 +19,9 @@
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* http://www.FreeRTOS.org
* http://aws.amazon.com/freertos
* https://www.FreeRTOS.org
* https://github.com/FreeRTOS
*
* 1 tab == 4 spaces!
*/
#ifndef CORE_MQTT_CONFIG_H
#define CORE_MQTT_CONFIG_H
@ -75,6 +74,35 @@ extern void vLoggingPrintf( const char * pcFormatString,
* macro sets the limit on how many simultaneous PUBLISH states an MQTT
* context maintains.
*/
#define MQTT_STATE_ARRAY_MAX_COUNT 20U
#define MQTT_STATE_ARRAY_MAX_COUNT 20U
/*********************** coreMQTT Agent Configurations **********************/
/**
* @brief The maximum number of pending acknowledgments to track for a single
* connection.
*
* @note The MQTT agent tracks MQTT commands (such as PUBLISH and SUBSCRIBE) th
* at are still waiting to be acknowledged. MQTT_AGENT_MAX_OUTSTANDING_ACKS set
* the maximum number of acknowledgments that can be outstanding at any one time.
* The higher this number is the greater the agent's RAM consumption will be.
*/
#define MQTT_AGENT_MAX_OUTSTANDING_ACKS ( 20U )
/**
* @brief Time in MS that the MQTT agent task will wait in the Blocked state (so
* not using any CPU time) for a command to arrive in its command queue before
* exiting the blocked state so it can call MQTT_ProcessLoop().
*
* @note It is important MQTT_ProcessLoop() is called often if there is known
* MQTT traffic, but calling it too often can take processing time away from
* lower priority tasks and waste CPU time and power.
*/
#define MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME ( 1000 )
/**
* @brief The number of command structures to allocate in the pool
* for the agent.
*/
#define MQTT_COMMAND_CONTEXTS_POOL_SIZE 10
#endif /* ifndef CORE_MQTT_CONFIG_H */

@ -246,13 +246,13 @@ extern void vLoggingPrintf( const char * pcFormatString,
* symbol.
*/
#include "core_mqtt.h" /* Include coreMQTT header for MQTT_LIBRARY_VERSION macro. */
#define democonfigMQTT_LIB "core-mqtt@"MQTT_LIBRARY_VERSION
#define democonfigMQTT_LIB "core-mqtt@"MQTT_LIBRARY_VERSION
/**
* @brief Whether to use mutual authentication. If this macro is not set to 1
* or not defined, then plaintext TCP will be used instead of TLS over TCP.
*/
#define democonfigUSE_TLS 1
#define democonfigUSE_TLS 1
/**
* @brief Set the stack size of the main demo task.
@ -260,7 +260,18 @@ extern void vLoggingPrintf( const char * pcFormatString,
* In the Windows port, this stack only holds a structure. The actual
* stack is created by an operating system thread.
*/
#define democonfigDEMO_STACKSIZE configMINIMAL_STACK_SIZE
#define democonfigDEMO_STACKSIZE configMINIMAL_STACK_SIZE
/**
* @brief The number of simple sub-pub tasks to create.
*/
#define democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE 2
#define democonfigSIMPLE_SUB_PUB_TASK_STACK_SIZE ( configMINIMAL_STACK_SIZE )
/**
* @brief The length of the queue used to hold commands for the coreMQTT Agent.
*/
#define MQTT_AGENT_COMMAND_QUEUE_LENGTH 10

@ -2,8 +2,6 @@ The multi threaded example creates an MQTT agent (or daemon task). It is thread
safe because only the agent task is allowed to access the coreMQTT API - hence
the API is only accessed from one FreeRTOS task. Other tasks and interrupts
needing to interact with the MQTT agent do so through a thread safe queue.
We are generalising this technique for future coreMQTT releases, which will have
a re-usable agent component.
! Plain text examples are for ease of evaluation only - product devices should
! always use authenticated and encrypted communication. Never send private or

@ -2,4 +2,4 @@
Prop3=19,11
[InternetShortcut]
IDList=
URL=https://www.freertos.org/mqtt/rtos_mqtt_example.html
URL=https://www.freertos.org/mqtt/mqtt-agent-demo.html?

@ -1,5 +0,0 @@
[{000214A0-0000-0000-C000-000000000046}]
Prop3=19,11
[InternetShortcut]
IDList=
URL=https://www.freertos.org/mqtt/mqtt-agent-demo.html?

@ -0,0 +1,170 @@
/*
* FreeRTOS V202012.00
* Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* https://www.FreeRTOS.org
* https://github.com/FreeRTOS
*
*/
/**
* @file subscription_manager.c
* @brief Functions for managing MQTT subscriptions.
*/
/* Standard includes. */
#include <string.h>
/* Subscription manager header include. */
#include "subscription_manager.h"
bool addSubscription( SubscriptionElement_t * pxSubscriptionList,
const char * pcTopicFilterString,
uint16_t usTopicFilterLength,
IncomingPubCallback_t pxIncomingPublishCallback,
void * pvIncomingPublishCallbackContext )
{
int32_t lIndex = 0;
size_t xAvailableIndex = SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS;
bool xReturnStatus = false;
if( ( pxSubscriptionList == NULL ) ||
( pcTopicFilterString == NULL ) ||
( usTopicFilterLength == 0U ) ||
( pxIncomingPublishCallback == NULL ) )
{
LogError( ( "Invalid parameter. pxSubscriptionList=%p, pcTopicFilterString=%p,"
" usTopicFilterLength=%u, pxIncomingPublishCallback=%p.",
pxSubscriptionList,
pcTopicFilterString,
( unsigned int ) usTopicFilterLength,
pxIncomingPublishCallback ) );
}
else
{
/* Start at end of array, so that we will insert at the first available index.
* Scans backwards to find duplicates. */
for( lIndex = ( int32_t ) SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS - 1; lIndex >= 0; lIndex-- )
{
if( pxSubscriptionList[ lIndex ].usFilterStringLength == 0 )
{
xAvailableIndex = lIndex;
}
else if( ( pxSubscriptionList[ lIndex ].usFilterStringLength == usTopicFilterLength ) &&
( strncmp( pcTopicFilterString, pxSubscriptionList[ lIndex ].pcSubscriptionFilterString, ( size_t ) usTopicFilterLength ) == 0 ) )
{
/* If a subscription already exists, don't do anything. */
if( ( pxSubscriptionList[ lIndex ].pxIncomingPublishCallback == pxIncomingPublishCallback ) &&
( pxSubscriptionList[ lIndex ].pvIncomingPublishCallbackContext == pvIncomingPublishCallbackContext ) )
{
LogWarn( ( "Subscription already exists.\n" ) );
xAvailableIndex = SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS;
xReturnStatus = true;
break;
}
}
}
if( xAvailableIndex < SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS )
{
pxSubscriptionList[ xAvailableIndex ].pcSubscriptionFilterString = pcTopicFilterString;
pxSubscriptionList[ xAvailableIndex ].usFilterStringLength = usTopicFilterLength;
pxSubscriptionList[ xAvailableIndex ].pxIncomingPublishCallback = pxIncomingPublishCallback;
pxSubscriptionList[ xAvailableIndex ].pvIncomingPublishCallbackContext = pvIncomingPublishCallbackContext;
xReturnStatus = true;
}
}
return xReturnStatus;
}
/*-----------------------------------------------------------*/
void removeSubscription( SubscriptionElement_t * pxSubscriptionList,
const char * pcTopicFilterString,
uint16_t usTopicFilterLength )
{
int32_t lIndex = 0;
if( ( pxSubscriptionList == NULL ) ||
( pcTopicFilterString == NULL ) ||
( usTopicFilterLength == 0U ) )
{
LogError( ( "Invalid parameter. pxSubscriptionList=%p, pcTopicFilterString=%p,"
" usTopicFilterLength=%u.",
pxSubscriptionList,
pcTopicFilterString,
( unsigned int ) usTopicFilterLength ) );
}
else
{
for( lIndex = 0; lIndex < SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; lIndex++ )
{
if( pxSubscriptionList[ lIndex ].usFilterStringLength == usTopicFilterLength )
{
if( strncmp( pxSubscriptionList[ lIndex ].pcSubscriptionFilterString, pcTopicFilterString, usTopicFilterLength ) == 0 )
{
memset( &( pxSubscriptionList[ lIndex ] ), 0x00, sizeof( SubscriptionElement_t ) );
}
}
}
}
}
/*-----------------------------------------------------------*/
bool handleIncomingPublishes( SubscriptionElement_t * pxSubscriptionList,
MQTTPublishInfo_t * pxPublishInfo )
{
int32_t lIndex = 0;
bool isMatched = false, publishHandled = false;
if( ( pxSubscriptionList == NULL ) ||
( pxPublishInfo == NULL ) )
{
LogError( ( "Invalid parameter. pxSubscriptionList=%p, pxPublishInfo=%p,",
pxSubscriptionList,
pxPublishInfo ) );
}
else
{
for( lIndex = 0; lIndex < SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS; lIndex++ )
{
if( pxSubscriptionList[ lIndex ].usFilterStringLength > 0 )
{
MQTT_MatchTopic( pxPublishInfo->pTopicName,
pxPublishInfo->topicNameLength,
pxSubscriptionList[ lIndex ].pcSubscriptionFilterString,
pxSubscriptionList[ lIndex ].usFilterStringLength,
&isMatched );
if( isMatched == true )
{
pxSubscriptionList[ lIndex ].pxIncomingPublishCallback( pxSubscriptionList[ lIndex ].pvIncomingPublishCallbackContext,
pxPublishInfo );
publishHandled = true;
}
}
}
}
return publishHandled;
}

@ -0,0 +1,150 @@
/*
* FreeRTOS V202012.00
* Copyright (C) 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*
* https://www.FreeRTOS.org
* https://github.com/FreeRTOS
*
*/
/**
* @file subscription_manager.h
* @brief Functions for managing MQTT subscriptions.
*/
#ifndef SUBSCRIPTION_MANAGER_H
#define SUBSCRIPTION_MANAGER_H
/**************************************************/
/******* DO NOT CHANGE the following order ********/
/**************************************************/
/* Logging related header files are required to be included in the following order:
* 1. Include the header file "logging_levels.h".
* 2. Define LIBRARY_LOG_NAME and LIBRARY_LOG_LEVEL.
* 3. Include the header file "logging_stack.h".
*/
/* Include header that defines log levels. */
#include "logging_levels.h"
/* Logging configuration for the Subscription Manager module. */
#ifndef LIBRARY_LOG_NAME
#define LIBRARY_LOG_NAME "Subscription Manager"
#endif
#ifndef LIBRARY_LOG_LEVEL
#define LIBRARY_LOG_LEVEL LOG_ERROR
#endif
#include "logging_stack.h"
/* Demo config include. */
#include "demo_config.h"
/* core MQTT include. */
#include "core_mqtt.h"
/**
* @brief Maximum number of subscriptions maintained by the subscription manager
* simultaneously in a list.
*/
#ifndef SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS
#define SUBSCRIPTION_MANAGER_MAX_SUBSCRIPTIONS 10U
#endif
/**
* @brief Callback function called when receiving a publish.
*
* @param[in] pvIncomingPublishCallbackContext The incoming publish callback context.
* @param[in] pxPublishInfo Deserialized publish information.
*/
typedef void (* IncomingPubCallback_t )( void * pvIncomingPublishCallbackContext,
MQTTPublishInfo_t * pxPublishInfo );
/**
* @brief An element in the list of subscriptions.
*
* This subscription manager implementation expects that the array of the
* subscription elements used for storing subscriptions to be initialized to 0.
*
* @note This implementation allows multiple tasks to subscribe to the same topic.
* In this case, another element is added to the subscription list, differing
* in the intended publish callback. Also note that the topic filters are not
* copied in the subscription manager and hence the topic filter strings need to
* stay in scope until unsubscribed.
*/
typedef struct subscriptionElement
{
IncomingPubCallback_t pxIncomingPublishCallback;
void * pvIncomingPublishCallbackContext;
uint16_t usFilterStringLength;
const char * pcSubscriptionFilterString;
} SubscriptionElement_t;
/**
* @brief Add a subscription to the subscription list.
*
* @note Multiple tasks can be subscribed to the same topic with different
* context-callback pairs. However, a single context-callback pair may only be
* associated to the same topic filter once.
*
* @param[in] pxSubscriptionList The pointer to the subscription list array.
* @param[in] pcTopicFilterString Topic filter string of subscription.
* @param[in] usTopicFilterLength Length of topic filter string.
* @param[in] pxIncomingPublishCallback Callback function for the subscription.
* @param[in] pvIncomingPublishCallbackContext Context for the subscription callback.
*
* @return `true` if subscription added or exists, `false` if insufficient memory.
*/
bool addSubscription( SubscriptionElement_t * pxSubscriptionList,
const char * pcTopicFilterString,
uint16_t usTopicFilterLength,
IncomingPubCallback_t pxIncomingPublishCallback,
void * pvIncomingPublishCallbackContext );
/**
* @brief Remove a subscription from the subscription list.
*
* @note If the topic filter exists multiple times in the subscription list,
* then every instance of the subscription will be removed.
*
* @param[in] pxSubscriptionList The pointer to the subscription list array.
* @param[in] pcTopicFilterString Topic filter of subscription.
* @param[in] usTopicFilterLength Length of topic filter.
*/
void removeSubscription( SubscriptionElement_t * pxSubscriptionList,
const char * pcTopicFilterString,
uint16_t usTopicFilterLength );
/**
* @brief Handle incoming publishes by invoking the callbacks registered
* for the incoming publish's topic filter.
*
* @param[in] pxSubscriptionList The pointer to the subscription list array.
* @param[in] pxPublishInfo Info of incoming publish.
*
* @return `true` if an application callback could be invoked;
* `false` otherwise.
*/
bool handleIncomingPublishes( SubscriptionElement_t * pxSubscriptionList,
MQTTPublishInfo_t * pxPublishInfo );
#endif /* SUBSCRIPTION_MANAGER_H */

@ -1 +1 @@
Subproject commit 541402274fc6bf52f35e14f39c0c1dd2f9205ad3
Subproject commit d61dd0921bd651c0bfbaa2e41bb0eda56245a36b

@ -1,5 +1,12 @@
Documentation and download available at https://www.FreeRTOS.org/
Changes after FreeRTOS 202012.00 release
+ Added new coreMQTT Agent library.
+ Demo Updates
- MQTT Agent
Changes between FreeRTOS V202011.00 and FreeRTOS V202012.00 released December 2020
+ Includes libraries that are part of the FreeRTOS 202012.00 LTS release.

@ -38,6 +38,13 @@ dependencies:
url: "https://github.com/FreeRTOS/coreMQTT.git"
path: "FreeRTOS-Plus/Source/Application-Protocols/coreMQTT"
- name: "coreMQTT Agent"
version: "v1.0.0"
repository:
type: "git"
url: "https://github.com/FreeRTOS/coreMQTT-Agent.git"
path: "FreeRTOS-Plus/Source/Application-Protocols/coreMQTT-Agent"
- name: "corePKCS11"
version: "v3.0.0"
repository:

Loading…
Cancel
Save