@ -168,13 +168,13 @@
*/
# define OTA_TOPIC_CLIENT_IDENTIFIER_START_IDX ( 12U )
/**
/**
* @ brief Default topic filter for OTA .
* This is used to route all the packets for OTA reserved topics which OTA agent has not subscribed for .
*/
# define OTA_DEFAULT_TOPIC_FILTER OTA_TOPIC_PREFIX "jobs / #"
/**
/**
* @ brief Length of default topic filter .
*/
# define OTA_DEFAULT_TOPIC_FILTER_LENGTH ( ( uint16_t ) ( sizeof( OTA_DEFAULT_TOPIC_FILTER ) - 1 ) )
@ -184,20 +184,20 @@
*/
# define otaexampleMAX_UINT32 ( 0xffffffff )
/**
/**
* @ brief Dimensions the buffer used to serialize and deserialize MQTT packets .
* @ note Specified in bytes . Must be large enough to hold the maximum
* anticipated MQTT payload .
*/
# ifndef MQTT_AGENT_NETWORK_BUFFER_SIZE
# define MQTT_AGENT_NETWORK_BUFFER_SIZE ( 10240 )
# define MQTT_AGENT_NETWORK_BUFFER_SIZE ( 10240 )
# endif
/**
/**
* @ brief The length of the queue used to hold commands for the agent .
*/
# ifndef MQTT_AGENT_COMMAND_QUEUE_LENGTH
# define MQTT_AGENT_COMMAND_QUEUE_LENGTH ( 10 )
# define MQTT_AGENT_COMMAND_QUEUE_LENGTH ( 10 )
# endif
/**
@ -207,7 +207,7 @@
*/
# define MQTT_AGENT_SEND_BLOCK_TIME_MS ( 200U )
/**
/**
* @ brief This demo uses task notifications to signal tasks from MQTT callback
* functions . mqttexampleMS_TO_WAIT_FOR_NOTIFICATION defines the time , in ticks ,
* to wait for such a callback .
@ -247,7 +247,7 @@
*/
# define otaexampleTRANSPORT_SEND_RECV_TIMEOUT_MS ( 750 )
/**
/**
* @ brief Timeout for receiving CONNACK after sending an MQTT CONNECT packet .
* Defined in milliseconds .
*/
@ -281,7 +281,7 @@
# define otaexampleMILLISECONDS_PER_SECOND ( 1000U )
# define otaexampleMILLISECONDS_PER_TICK ( otaexampleMILLISECONDS_PER_SECOND / configTICK_RATE_HZ )
/**
/**
* @ brief The timeout for waiting for the agent to get suspended after closing the
* connection .
*
@ -291,6 +291,133 @@
*/
# define OTA_SUSPEND_TIMEOUT_MS ( 10000U )
/* Compile time error for some undefined configs, and provide default values
* for others . */
# ifndef democonfigMQTT_BROKER_ENDPOINT
# error "Please define democonfigMQTT_BROKER_ENDPOINT in demo_config.h."
# endif
# ifndef democonfigCLIENT_IDENTIFIER
/**
* @ brief The MQTT client identifier used in this example . Each client identifier
* must be unique so edit as required to ensure no two clients connecting to the
* same broker use the same client identifier . Using a # define is for convenience
* of demonstration only - production devices should use something unique to the
* device that can be read from software - such as a production serial number .
*/
# error "Please define democonfigCLIENT_IDENTIFIER in demo_config.h to something unique for this device."
# endif
# if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 )
# ifndef democonfigROOT_CA_PEM
# error "Please define Root CA certificate of the MQTT broker(democonfigROOT_CA_PEM) in demo_config.h."
# endif
/* If no username is defined, then a client certificate/key is required. */
# ifndef democonfigCLIENT_USERNAME
/*
* ! ! ! Please note democonfigCLIENT_PRIVATE_KEY_PEM in used for
* ! ! ! convenience of demonstration only . Production devices should
* ! ! ! store keys securely , such as within a secure element .
*/
# ifndef democonfigCLIENT_CERTIFICATE_PEM
# error "Please define client certificate(democonfigCLIENT_CERTIFICATE_PEM) in demo_config.h."
# endif
# ifndef democonfigCLIENT_PRIVATE_KEY_PEM
# error "Please define client private key(democonfigCLIENT_PRIVATE_KEY_PEM) in demo_config.h."
# endif
# else
/* If a username is defined, a client password also would need to be defined for
* client authentication . */
# ifndef democonfigCLIENT_PASSWORD
# error "Please define client password(democonfigCLIENT_PASSWORD) in demo_config.h for client authentication based on username / password."
# endif
/* AWS IoT MQTT broker port needs to be 443 for client authentication based on
* username / password . */
# if defined( democonfigUSE_AWS_IOT_CORE_BROKER ) && democonfigMQTT_BROKER_PORT != 443
# error "Broker port(democonfigMQTT_BROKER_PORT) should be defined as 443 in demo_config.h for client authentication based on username / password in AWS IoT Core."
# endif
# endif /* ifndef democonfigCLIENT_USERNAME */
# ifndef democonfigMQTT_BROKER_PORT
# define democonfigMQTT_BROKER_PORT ( 8883 )
# endif
# else /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */
# ifndef democonfigMQTT_BROKER_PORT
# define democonfigMQTT_BROKER_PORT ( 1883 )
# endif
# endif /* if defined( democonfigUSE_TLS ) && ( democonfigUSE_TLS == 1 ) */
/**
* @ brief ALPN ( Application - Layer Protocol Negotiation ) protocol name for AWS IoT MQTT .
*
* This will be used if democonfigMQTT_BROKER_PORT is configured as 443 for the AWS IoT MQTT broker .
* Please see more details about the ALPN protocol for AWS IoT MQTT endpoint
* in the link below .
* https : //aws.amazon.com/blogs/iot/mqtt-with-tls-client-authentication-on-port-443-why-it-is-useful-and-how-it-works/
*/
# define AWS_IOT_MQTT_ALPN "\x0ex-amzn-mqtt-ca"
/**
* @ brief This is the ALPN ( Application - Layer Protocol Negotiation ) string
* required by AWS IoT for password - based authentication using TCP port 443.
*/
# define AWS_IOT_CUSTOM_AUTH_ALPN "\x04mqtt"
/**
* Provide default values for undefined configuration settings .
*/
# ifndef democonfigOS_NAME
# define democonfigOS_NAME "FreeRTOS"
# endif
# ifndef democonfigOS_VERSION
# define democonfigOS_VERSION tskKERNEL_VERSION_NUMBER
# endif
# ifndef democonfigHARDWARE_PLATFORM_NAME
# define democonfigHARDWARE_PLATFORM_NAME "WinSim"
# endif
# ifndef democonfigMQTT_LIB
# define democonfigMQTT_LIB "core-mqtt@1.0.0"
# endif
/**
* @ brief The MQTT metrics string expected by AWS IoT .
*/
# define AWS_IOT_METRICS_STRING \
" ?SDK= " democonfigOS_NAME " &Version= " democonfigOS_VERSION \
" &Platform= " democonfigHARDWARE_PLATFORM_NAME " &MQTTLib= " democonfigMQTT_LIB
/**
* @ brief The length of the MQTT metrics string expected by AWS IoT .
*/
# define AWS_IOT_METRICS_STRING_LENGTH ( ( uint16_t ) ( sizeof( AWS_IOT_METRICS_STRING ) - 1 ) )
# ifdef democonfigCLIENT_USERNAME
/**
* @ brief Append the username with the metrics string if # democonfigCLIENT_USERNAME is defined .
*
* This is to support both metrics reporting and username / password based client
* authentication by AWS IoT .
*/
# define CLIENT_USERNAME_WITH_METRICS democonfigCLIENT_USERNAME AWS_IOT_METRICS_STRING
# endif
/**
* @ brief Length of client identifier .
*/
# define democonfigCLIENT_IDENTIFIER_LENGTH ( ( uint16_t ) ( sizeof( democonfigCLIENT_IDENTIFIER ) - 1 ) )
/*---------------------------------------------------------*/
/**
@ -326,7 +453,6 @@ struct NetworkContext
TlsTransportParams_t * pParams ;
} ;
/*---------------------------------------------------------*/
/**
@ -429,7 +555,7 @@ static MQTTAgentContext_t xGlobalMqttAgentContext;
*
* @ param [ in ] pParam Can be used to pass down functionality to the agent task
*/
static void prvMQTTAgentTask ( void * pParam ) ;
static void prvMQTTAgentTask ( void * pParam ) ;
/**
* @ brief Function used by OTA agent to publish control messages to the MQTT broker .
@ -539,14 +665,14 @@ static void prvOTAAgentTask( void * pvParam );
*
* @ param [ in ] pvParam Any parameters to be passed to OTA Demo task .
*/
static void vOtaDemoTask ( void * pvParam ) ;
static void vOtaDemoTask ( void * pvParam ) ;
/**
* @ brief The function which implements the flow for OTA demo .
*
* @ return pdPASS if success or pdFAIL .
*/
static BaseType_t prvRunOTADemo ( void ) ;
static BaseType_t prvRunOTADemo ( void ) ;
/**
* @ brief Callback registered with the OTA library that notifies the OTA agent
@ -588,7 +714,7 @@ static void prvMqttDefaultCallback( void * pvIncomingPublishCallbackContext,
* @ brief Attempt to connect to the MQTT broker .
*
*/
static void prvConnectToMQTTBroker ( void ) ;
static void prvConnectToMQTTBroker ( void ) ;
/**
* @ brief Retry logic to establish a connection to the MQTT broker .
@ -599,14 +725,14 @@ static void prvConnectToMQTTBroker(void);
* @ param [ in ] pNetworkContext Network context to connect on .
* @ return int pdFALSE if connection failed after retries .
*/
static BaseType_t prvSocketConnect ( NetworkContext_t * pNetworkContext ) ;
static BaseType_t prvSocketConnect ( NetworkContext_t * pNetworkContext ) ;
/**
* @ brief Disconnects from the MQTT broker .
* Initiates an MQTT disconnect and then teardown underlying TCP connection .
*
*/
static void prvDisconnectFromMQTTBroker ( void ) ;
static void prvDisconnectFromMQTTBroker ( void ) ;
/**
* @ brief Initializes an MQTT context , including transport interface and
@ -614,7 +740,7 @@ static void prvDisconnectFromMQTTBroker(void);
*
* @ return ` MQTTSuccess ` if the initialization succeeds , else ` MQTTBadParameter ` .
*/
static MQTTStatus_t prvMqttInit ( void ) ;
static MQTTStatus_t prvMqttInit ( void ) ;
/**
* @ brief Sends an MQTT Connect packet over the already connected TCP socket .
@ -634,22 +760,22 @@ static MQTTStatus_t prvMQTTConnect( bool xCleanSession );
* @ param [ in ] topicFilterLength length of the topic filter .
*
*/
static void prvRegisterOTACallback ( const char * pTopicFilter ,
uint16_t topicFilterLength ) ;
static void prvRegisterOTACallback ( const char * pTopicFilter ,
uint16_t topicFilterLength ) ;
/**
* @ brief Suspend OTA demo .
*
* @ return pPASS or pdFAIL .
*/
static BaseType_t prvSuspendOTA ( void ) ;
static BaseType_t prvSuspendOTA ( void ) ;
/**
* @ brief Resume OTA demo .
*
* @ return pPASS or pdFAIL .
*/
static BaseType_t prvResumeOTA ( void ) ;
static BaseType_t prvResumeOTA ( void ) ;
/**
* @ brief Set OTA interfaces .
@ -658,7 +784,7 @@ static BaseType_t prvResumeOTA(void);
*
* @ return None .
*/
static void setOtaInterfaces ( OtaInterfaces_t * pOtaInterfaces ) ;
static void setOtaInterfaces ( OtaInterfaces_t * pOtaInterfaces ) ;
/**
* @ brief Structure containing all application allocated buffers used by the OTA agent .
@ -918,9 +1044,9 @@ static void prvMqttDataCallback( void * pvIncomingPublishCallbackContext,
pxData = prvOTAEventBufferGet ( ) ;
if ( pxData ! = NULL )
if ( pxData ! = NULL )
{
memcpy ( pxData - > data , pxPublishInfo - > pPayload , pxPublishInfo - > payloadLength ) ;
memcpy ( pxData - > data , pxPublishInfo - > pPayload , pxPublishInfo - > payloadLength ) ;
pxData - > dataLength = pxPublishInfo - > payloadLength ;
eventMsg . eventId = OtaAgentEventReceivedFileBlock ;
eventMsg . pEventData = pxData ;
@ -947,46 +1073,45 @@ static void prvCommandCallback( MQTTAgentCommandContext_t * pxCommandContext,
}
}
static void prvMQTTSubscribeCompleteCallback ( MQTTAgentCommandContext_t * pxCommandContext ,
MQTTAgentReturnInfo_t * pxReturnInfo )
static void prvMQTTSubscribeCompleteCallback ( MQTTAgentCommandContext_t * pxCommandContext ,
MQTTAgentReturnInfo_t * pxReturnInfo )
{
MQTTAgentSubscribeArgs_t * pSubsribeArgs ;
MQTTAgentSubscribeArgs_t * pSubsribeArgs ;
if ( pxReturnInfo - > returnCode = = MQTTSuccess )
if ( pxReturnInfo - > returnCode = = MQTTSuccess )
{
pSubsribeArgs = ( MQTTAgentSubscribeArgs_t * ) ( pxCommandContext - > pArgs ) ;
prvRegisterOTACallback ( pSubsribeArgs - > pSubscribeInfo - > pTopicFilter , pSubsribeArgs - > pSubscribeInfo - > topicFilterLength ) ;
pSubsribeArgs = ( MQTTAgentSubscribeArgs_t * ) ( pxCommandContext - > pArgs ) ;
prvRegisterOTACallback ( pSubsribeArgs - > pSubscribeInfo - > pTopicFilter , pSubsribeArgs - > pSubscribeInfo - > topicFilterLength ) ;
}
/* Store the result in the application defined context so the task that
* initiated the publish can check the operation ' s status . */
pxCommandContext - > xReturnStatus = pxReturnInfo - > returnCode ;
if ( pxCommandContext - > xTaskToNotify ! = NULL )
if ( pxCommandContext - > xTaskToNotify ! = NULL )
{
/* Send the context's ulNotificationValue as the notification value so
* the receiving task can check the value it set in the context matches
* the value it receives in the notification . */
xTaskNotify ( pxCommandContext - > xTaskToNotify , ( uint32_t ) ( pxReturnInfo - > returnCode ) , eSetValueWithOverwrite ) ;
xTaskNotify ( pxCommandContext - > xTaskToNotify , ( uint32_t ) ( pxReturnInfo - > returnCode ) , eSetValueWithOverwrite ) ;
}
}
/*-----------------------------------------------------------*/
static void prvMQTTUnsubscribeCompleteCallback ( MQTTAgentCommandContext_t * pxCommandContext ,
MQTTAgentReturnInfo_t * pxReturnInfo )
static void prvMQTTUnsubscribeCompleteCallback ( MQTTAgentCommandContext_t * pxCommandContext ,
MQTTAgentReturnInfo_t * pxReturnInfo )
{
/* Store the result in the application defined context so the task that
* initiated the publish can check the operation ' s status . */
pxCommandContext - > xReturnStatus = pxReturnInfo - > returnCode ;
if ( pxCommandContext - > xTaskToNotify ! = NULL )
if ( pxCommandContext - > xTaskToNotify ! = NULL )
{
/* Send the context's ulNotificationValue as the notification value so
* the receiving task can check the value it set in the context matches
* the value it receives in the notification . */
xTaskNotify ( pxCommandContext - > xTaskToNotify , ( uint32_t ) ( pxReturnInfo - > returnCode ) , eSetValueWithOverwrite ) ;
xTaskNotify ( pxCommandContext - > xTaskToNotify , ( uint32_t ) ( pxReturnInfo - > returnCode ) , eSetValueWithOverwrite ) ;
}
}
@ -1058,15 +1183,15 @@ static void prvSubscriptionCommandCallback( void * pxCommandContext,
for ( xIndex = 0 ; xIndex < pxSubscribeArgs - > numSubscriptions ; xIndex + + )
{
/* This demo doesn't attempt to resubscribe in the event that a SUBACK failed. */
if ( pxReturnInfo - > pSubackCodes [ xIndex ] = = MQTTSubAckFailure )
if ( pxReturnInfo - > pSubackCodes [ xIndex ] = = MQTTSubAckFailure )
{
LogError ( ( " Failed to resubscribe to topic %.*s. " ,
pxSubscribeArgs - > pSubscribeInfo [ xIndex ] . topicFilterLength ,
pxSubscribeArgs - > pSubscribeInfo [ xIndex ] . pTopicFilter ) ) ;
pxSubscribeArgs - > pSubscribeInfo [ xIndex ] . topicFilterLength ,
pxSubscribeArgs - > pSubscribeInfo [ xIndex ] . pTopicFilter ) ) ;
/* Remove subscription callback for unsubscribe. */
removeSubscription ( xGlobalSubscriptionList ,
pxSubscribeArgs - > pSubscribeInfo [ xIndex ] . pTopicFilter ,
pxSubscribeArgs - > pSubscribeInfo [ xIndex ] . topicFilterLength ) ;
pxSubscribeArgs - > pSubscribeInfo [ xIndex ] . pTopicFilter ,
pxSubscribeArgs - > pSubscribeInfo [ xIndex ] . topicFilterLength ) ;
}
}
@ -1274,12 +1399,12 @@ static BaseType_t prvSocketConnect( NetworkContext_t * pxNetworkContext )
/*-----------------------------------------------------------*/
static BaseType_t prvSocketDisconnect ( NetworkContext_t * pxNetworkContext )
static BaseType_t prvSocketDisconnect ( NetworkContext_t * pxNetworkContext )
{
BaseType_t xDisconnected = pdFAIL ;
LogInfo ( ( " Disconnecting TLS connection. \n " ) ) ;
TLS_FreeRTOS_Disconnect ( pxNetworkContext ) ;
LogInfo ( ( " Disconnecting TLS connection. \n " ) ) ;
TLS_FreeRTOS_Disconnect ( pxNetworkContext ) ;
xDisconnected = pdPASS ;
return xDisconnected ;
@ -1303,7 +1428,7 @@ static MQTTStatus_t prvMQTTInit( void )
LogDebug ( ( " Creating command queue. " ) ) ;
xCommandQueue . queue = xQueueCreateStatic ( MQTT_AGENT_COMMAND_QUEUE_LENGTH ,
sizeof ( MQTTAgentCommand_t * ) ,
sizeof ( MQTTAgentCommand_t * ) ,
staticQueueStorageArea ,
& staticQueueStructure ) ;
configASSERT ( xCommandQueue . queue ) ;
@ -1456,9 +1581,9 @@ static void prvDisconnectFromMQTTBroker( void )
prvSocketDisconnect ( & xNetworkContextMqtt ) ;
}
static OtaMqttStatus_t prvMQTTSubscribe ( const char * pTopicFilter ,
static OtaMqttStatus_t prvMQTTSubscribe ( const char * pTopicFilter ,
uint16_t topicFilterLength ,
uint8_t ucQoS )
uint8_t ucQoS )
{
MQTTStatus_t mqttStatus ;
uint32_t ulNotifiedValue ;
@ -1469,8 +1594,8 @@ static OtaMqttStatus_t prvMQTTSubscribe(const char* pTopicFilter,
MQTTAgentCommandContext_t xApplicationDefinedContext = { 0 } ;
OtaMqttStatus_t otaRet = OtaMqttSuccess ;
configASSERT ( pTopicFilter ! = NULL ) ;
configASSERT ( topicFilterLength > 0 ) ;
configASSERT ( pTopicFilter ! = NULL ) ;
configASSERT ( topicFilterLength > 0 ) ;
xSubscribeInfo . pTopicFilter = pTopicFilter ;
xSubscribeInfo . topicFilterLength = topicFilterLength ;
@ -1484,21 +1609,21 @@ static OtaMqttStatus_t prvMQTTSubscribe(const char* pTopicFilter,
xCommandParams . blockTimeMs = otaexampleMQTT_TIMEOUT_MS ;
xCommandParams . cmdCompleteCallback = prvMQTTSubscribeCompleteCallback ;
xCommandParams . pCmdCompleteCallbackContext = ( void * ) & xApplicationDefinedContext ;
xCommandParams . pCmdCompleteCallbackContext = ( void * ) & xApplicationDefinedContext ;
xTaskNotifyStateClear ( NULL ) ;
xTaskNotifyStateClear ( NULL ) ;
mqttStatus = MQTTAgent_Subscribe ( & xGlobalMqttAgentContext ,
mqttStatus = MQTTAgent_Subscribe ( & xGlobalMqttAgentContext ,
& xSubscribeArgs ,
& xCommandParams ) ;
& xCommandParams ) ;
/* Wait for command to complete so MQTTSubscribeInfo_t remains in scope for the
* duration of the command . */
if ( mqttStatus = = MQTTSuccess )
if ( mqttStatus = = MQTTSuccess )
{
result = xTaskNotifyWait ( 0 , otaexampleMAX_UINT32 , & ulNotifiedValue , pdMS_TO_TICKS ( otaexampleMQTT_TIMEOUT_MS ) ) ;
result = xTaskNotifyWait ( 0 , otaexampleMAX_UINT32 , & ulNotifiedValue , pdMS_TO_TICKS ( otaexampleMQTT_TIMEOUT_MS ) ) ;
if ( result = = pdTRUE )
if ( result = = pdTRUE )
{
mqttStatus = xApplicationDefinedContext . xReturnStatus ;
}
@ -1508,18 +1633,18 @@ static OtaMqttStatus_t prvMQTTSubscribe(const char* pTopicFilter,
}
}
if ( mqttStatus ! = MQTTSuccess )
if ( mqttStatus ! = MQTTSuccess )
{
LogError ( ( " Failed to SUBSCRIBE to topic with error = %u. " ,
mqttStatus ) ) ;
LogError ( ( " Failed to SUBSCRIBE to topic with error = %u. " ,
mqttStatus ) ) ;
otaRet = OtaMqttSubscribeFailed ;
}
else
{
LogInfo ( ( " Subscribed to topic %.*s. \n \n " ,
LogInfo ( ( " Subscribed to topic %.*s. \n \n " ,
topicFilterLength ,
pTopicFilter ) ) ;
pTopicFilter ) ) ;
otaRet = OtaMqttSuccess ;
}
@ -1527,11 +1652,11 @@ static OtaMqttStatus_t prvMQTTSubscribe(const char* pTopicFilter,
return otaRet ;
}
static OtaMqttStatus_t prvMQTTPublish ( const char * const pacTopic ,
static OtaMqttStatus_t prvMQTTPublish ( const char * const pacTopic ,
uint16_t topicLen ,
const char * pMsg ,
const char * pMsg ,
uint32_t msgSize ,
uint8_t qos )
uint8_t qos )
{
OtaMqttStatus_t otaRet = OtaMqttSuccess ;
BaseType_t result ;
@ -1547,23 +1672,23 @@ static OtaMqttStatus_t prvMQTTPublish(const char* const pacTopic,
publishInfo . payloadLength = msgSize ;
xCommandContext . xTaskToNotify = xTaskGetCurrentTaskHandle ( ) ;
xTaskNotifyStateClear ( NULL ) ;
xTaskNotifyStateClear ( NULL ) ;
xCommandParams . blockTimeMs = otaexampleMQTT_TIMEOUT_MS ;
xCommandParams . cmdCompleteCallback = prvCommandCallback ;
xCommandParams . pCmdCompleteCallbackContext = ( void * ) & xCommandContext ;
xCommandParams . pCmdCompleteCallbackContext = ( void * ) & xCommandContext ;
mqttStatus = MQTTAgent_Publish ( & xGlobalMqttAgentContext ,
mqttStatus = MQTTAgent_Publish ( & xGlobalMqttAgentContext ,
& publishInfo ,
& xCommandParams ) ;
& xCommandParams ) ;
/* Wait for command to complete so MQTTSubscribeInfo_t remains in scope for the
* duration of the command . */
if ( mqttStatus = = MQTTSuccess )
if ( mqttStatus = = MQTTSuccess )
{
result = xTaskNotifyWait ( 0 , otaexampleMAX_UINT32 , NULL , pdMS_TO_TICKS ( otaexampleMQTT_TIMEOUT_MS ) ) ;
result = xTaskNotifyWait ( 0 , otaexampleMAX_UINT32 , NULL , pdMS_TO_TICKS ( otaexampleMQTT_TIMEOUT_MS ) ) ;
if ( result ! = pdTRUE )
if ( result ! = pdTRUE )
{
mqttStatus = MQTTSendFailed ;
}
@ -1573,16 +1698,16 @@ static OtaMqttStatus_t prvMQTTPublish(const char* const pacTopic,
}
}
if ( mqttStatus ! = MQTTSuccess )
if ( mqttStatus ! = MQTTSuccess )
{
LogError ( ( " Failed to send PUBLISH packet to broker with error = %u. " , mqttStatus ) ) ;
LogError ( ( " Failed to send PUBLISH packet to broker with error = %u. " , mqttStatus ) ) ;
otaRet = OtaMqttPublishFailed ;
}
else
{
LogInfo ( ( " Sent PUBLISH packet to broker %.*s to broker. \n \n " ,
LogInfo ( ( " Sent PUBLISH packet to broker %.*s to broker. \n \n " ,
topicLen ,
pacTopic ) ) ;
pacTopic ) ) ;
otaRet = OtaMqttSuccess ;
}
@ -1590,9 +1715,9 @@ static OtaMqttStatus_t prvMQTTPublish(const char* const pacTopic,
return otaRet ;
}
static OtaMqttStatus_t prvMQTTUnsubscribe ( const char * pTopicFilter ,
static OtaMqttStatus_t prvMQTTUnsubscribe ( const char * pTopicFilter ,
uint16_t topicFilterLength ,
uint8_t ucQoS )
uint8_t ucQoS )
{
MQTTStatus_t mqttStatus ;
uint32_t ulNotifiedValue ;
@ -1603,8 +1728,8 @@ static OtaMqttStatus_t prvMQTTUnsubscribe(const char* pTopicFilter,
MQTTAgentCommandContext_t xApplicationDefinedContext = { 0 } ;
OtaMqttStatus_t otaRet = OtaMqttSuccess ;
configASSERT ( pTopicFilter ! = NULL ) ;
configASSERT ( topicFilterLength > 0 ) ;
configASSERT ( pTopicFilter ! = NULL ) ;
configASSERT ( topicFilterLength > 0 ) ;
xSubscribeInfo . pTopicFilter = pTopicFilter ;
xSubscribeInfo . topicFilterLength = topicFilterLength ;
@ -1617,23 +1742,23 @@ static OtaMqttStatus_t prvMQTTUnsubscribe(const char* pTopicFilter,
xCommandParams . blockTimeMs = otaexampleMQTT_TIMEOUT_MS ;
xCommandParams . cmdCompleteCallback = prvMQTTUnsubscribeCompleteCallback ;
xCommandParams . pCmdCompleteCallbackContext = ( void * ) & xApplicationDefinedContext ;
xCommandParams . pCmdCompleteCallbackContext = ( void * ) & xApplicationDefinedContext ;
LogInfo ( ( " Unsubscribing to topic filter: %s " , pTopicFilter ) ) ;
xTaskNotifyStateClear ( NULL ) ;
LogInfo ( ( " Unsubscribing to topic filter: %s " , pTopicFilter ) ) ;
xTaskNotifyStateClear ( NULL ) ;
mqttStatus = MQTTAgent_Unsubscribe ( & xGlobalMqttAgentContext ,
mqttStatus = MQTTAgent_Unsubscribe ( & xGlobalMqttAgentContext ,
& xSubscribeArgs ,
& xCommandParams ) ;
& xCommandParams ) ;
/* Wait for command to complete so MQTTSubscribeInfo_t remains in scope for the
* duration of the command . */
if ( mqttStatus = = MQTTSuccess )
if ( mqttStatus = = MQTTSuccess )
{
result = xTaskNotifyWait ( 0 , otaexampleMAX_UINT32 , & ulNotifiedValue , pdMS_TO_TICKS ( otaexampleMQTT_TIMEOUT_MS ) ) ;
result = xTaskNotifyWait ( 0 , otaexampleMAX_UINT32 , & ulNotifiedValue , pdMS_TO_TICKS ( otaexampleMQTT_TIMEOUT_MS ) ) ;
if ( result = = pdTRUE )
if ( result = = pdTRUE )
{
mqttStatus = xApplicationDefinedContext . xReturnStatus ;
}
@ -1643,20 +1768,20 @@ static OtaMqttStatus_t prvMQTTUnsubscribe(const char* pTopicFilter,
}
}
if ( mqttStatus ! = MQTTSuccess )
if ( mqttStatus ! = MQTTSuccess )
{
LogError ( ( " Failed to UNSUBSCRIBE from topic %.*s with error = %u. " ,
LogError ( ( " Failed to UNSUBSCRIBE from topic %.*s with error = %u. " ,
topicFilterLength ,
pTopicFilter ,
mqttStatus ) ) ;
mqttStatus ) ) ;
otaRet = OtaMqttUnsubscribeFailed ;
}
else
{
LogInfo ( ( " UNSUBSCRIBED from topic %.*s. \n \n " ,
LogInfo ( ( " UNSUBSCRIBED from topic %.*s. \n \n " ,
topicFilterLength ,
pTopicFilter ) ) ;
pTopicFilter ) ) ;
otaRet = OtaMqttSuccess ;
}
@ -1666,9 +1791,9 @@ static OtaMqttStatus_t prvMQTTUnsubscribe(const char* pTopicFilter,
/*-----------------------------------------------------------*/
static void setOtaInterfaces ( OtaInterfaces_t * pOtaInterfaces )
static void setOtaInterfaces ( OtaInterfaces_t * pOtaInterfaces )
{
configASSERT ( pOtaInterfaces ! = NULL ) ;
configASSERT ( pOtaInterfaces ! = NULL ) ;
/* Initialize OTA library OS Interface. */
pOtaInterfaces - > os . event . init = OtaInitEvent_FreeRTOS ;
@ -1699,21 +1824,21 @@ static void setOtaInterfaces(OtaInterfaces_t* pOtaInterfaces)
/*-----------------------------------------------------------*/
static void prvOTAAgentTask ( void * pParam )
static void prvOTAAgentTask ( void * pParam )
{
/* Calling OTA agent task. */
OTA_EventProcessingTask ( pParam ) ;
LogInfo ( ( " OTA Agent stopped. " ) ) ;
OTA_EventProcessingTask ( pParam ) ;
LogInfo ( ( " OTA Agent stopped. " ) ) ;
vTaskDelete ( NULL ) ;
vTaskDelete ( NULL ) ;
}
static void prvMQTTAgentTask ( void * pParam )
static void prvMQTTAgentTask ( void * pParam )
{
BaseType_t xResult = pdFAIL ;
MQTTStatus_t xMQTTStatus = MQTTSuccess ;
( void ) pParam ;
( void ) pParam ;
do
{
@ -1722,36 +1847,36 @@ static void prvMQTTAgentTask(void* pParam)
* which could be a disconnect . If an error occurs the MQTT context on
* which the error happened is returned so there can be an attempt to
* clean up and reconnect however the application writer prefers . */
xMQTTStatus = MQTTAgent_CommandLoop ( & xGlobalMqttAgentContext ) ;
xMQTTStatus = MQTTAgent_CommandLoop ( & xGlobalMqttAgentContext ) ;
/* Clear Agent queue so that no any pending MQTT operations are processed. */
MQTTAgent_CancelAll ( & xGlobalMqttAgentContext ) ;
MQTTAgent_CancelAll ( & xGlobalMqttAgentContext ) ;
/* Success is returned for application initiated disconnect or termination. The socket will also be disconnected by the caller. */
if ( xMQTTStatus ! = MQTTSuccess )
if ( xMQTTStatus ! = MQTTSuccess )
{
xResult = prvSuspendOTA ( ) ;
configASSERT ( xResult = = pdPASS ) ;
configASSERT ( xResult = = pdPASS ) ;
LogInfo ( ( " Suspended OTA agent. " ) ) ;
LogInfo ( ( " Suspended OTA agent. " ) ) ;
/* End TLS session, then close TCP connection. */
prvSocketDisconnect ( & xNetworkContextMqtt ) ;
prvSocketDisconnect ( & xNetworkContextMqtt ) ;
/* Connect to MQTT broker. */
prvConnectToMQTTBroker ( ) ;
xResult = prvResumeOTA ( ) ;
configASSERT ( xResult = = pdPASS ) ;
configASSERT ( xResult = = pdPASS ) ;
LogInfo ( ( " Resumed OTA agent. " ) ) ;
LogInfo ( ( " Resumed OTA agent. " ) ) ;
}
} while ( xMQTTStatus ! = MQTTSuccess ) ;
} while ( xMQTTStatus ! = MQTTSuccess ) ;
vTaskDelete ( NULL ) ;
vTaskDelete ( NULL ) ;
}
static BaseType_t prvSuspendOTA ( void )
static BaseType_t prvSuspendOTA ( void )
{
/* OTA library return status. */
OtaErr_t otaRet = OtaErrNone ;
@ -1760,33 +1885,33 @@ static BaseType_t prvSuspendOTA(void)
otaRet = OTA_Suspend ( ) ;
if ( otaRet = = OtaErrNone )
if ( otaRet = = OtaErrNone )
{
suspendTimeout = OTA_SUSPEND_TIMEOUT_MS ;
while ( ( OTA_GetState ( ) ! = OtaAgentStateSuspended ) & & ( suspendTimeout > 0 ) )
while ( ( OTA_GetState ( ) ! = OtaAgentStateSuspended ) & & ( suspendTimeout > 0 ) )
{
/* Wait for OTA Library state to suspend */
vTaskDelay ( pdMS_TO_TICKS ( otaexampleTASK_DELAY_MS ) ) ;
vTaskDelay ( pdMS_TO_TICKS ( otaexampleTASK_DELAY_MS ) ) ;
suspendTimeout - = otaexampleTASK_DELAY_MS ;
}
if ( OTA_GetState ( ) ! = OtaAgentStateSuspended )
if ( OTA_GetState ( ) ! = OtaAgentStateSuspended )
{
LogError ( ( " Failed to suspend OTA. " ) ) ;
LogError ( ( " Failed to suspend OTA. " ) ) ;
status = pdFAIL ;
}
}
else
{
LogError ( ( " Error while trying to suspend OTA agent %d " , otaRet ) ) ;
LogError ( ( " Error while trying to suspend OTA agent %d " , otaRet ) ) ;
status = pdFAIL ;
}
return status ;
}
static BaseType_t prvResumeOTA ( void )
static BaseType_t prvResumeOTA ( void )
{
/* OTA library return status. */
OtaErr_t otaRet = OtaErrNone ;
@ -1795,26 +1920,26 @@ static BaseType_t prvResumeOTA(void)
otaRet = OTA_Resume ( ) ;
if ( otaRet = = OtaErrNone )
if ( otaRet = = OtaErrNone )
{
suspendTimeout = OTA_SUSPEND_TIMEOUT_MS ;
while ( ( OTA_GetState ( ) = = OtaAgentStateSuspended ) & & ( suspendTimeout > 0 ) )
while ( ( OTA_GetState ( ) = = OtaAgentStateSuspended ) & & ( suspendTimeout > 0 ) )
{
/* Wait for OTA Library state to suspend */
vTaskDelay ( pdMS_TO_TICKS ( otaexampleTASK_DELAY_MS ) ) ;
vTaskDelay ( pdMS_TO_TICKS ( otaexampleTASK_DELAY_MS ) ) ;
suspendTimeout - = otaexampleTASK_DELAY_MS ;
}
if ( OTA_GetState ( ) = = OtaAgentStateSuspended )
if ( OTA_GetState ( ) = = OtaAgentStateSuspended )
{
LogError ( ( " Failed to resume OTA. " ) ) ;
LogError ( ( " Failed to resume OTA. " ) ) ;
status = pdFAIL ;
}
}
else
{
LogError ( ( " Error while trying to resume OTA agent %d " , otaRet ) ) ;
LogError ( ( " Error while trying to resume OTA agent %d " , otaRet ) ) ;
status = pdFAIL ;
}
@ -1910,7 +2035,7 @@ static BaseType_t prvRunOTADemo( void )
otaStatistics . otaPacketsDropped ) ) ;
}
vTaskDelay ( pdMS_TO_TICKS ( otaexampleTASK_DELAY_MS ) ) ;
vTaskDelay ( pdMS_TO_TICKS ( otaexampleTASK_DELAY_MS ) ) ;
}
}
@ -1936,7 +2061,7 @@ static BaseType_t prvRunOTADemo( void )
* the OTA agent . If not , it is simply ignored .
*
*/
static void vOtaDemoTask ( void * pvParam )
static void vOtaDemoTask ( void * pvParam )
{
/* Return error status. */
BaseType_t xReturnStatus = pdPASS ;
@ -1944,7 +2069,7 @@ static void vOtaDemoTask( void* pvParam )
/* Flag for MQTT init status. */
bool mqttInitialized = false ;
( void ) pvParam ;
( void ) pvParam ;
LogInfo ( ( " OTA over MQTT demo, Application version %u.%u.%u " ,
appFirmwareVersion . u . x . major ,
@ -1962,7 +2087,7 @@ static void vOtaDemoTask( void* pvParam )
/****************************** Init MQTT ******************************/
if ( xReturnStatus = = pdPASS )
if ( xReturnStatus = = pdPASS )
{
/* Create the TCP connection to the broker, then the MQTT connection to the
* same . */
@ -1971,7 +2096,7 @@ static void vOtaDemoTask( void* pvParam )
/****************************** Create MQTT Agent Task. ******************************/
if ( xReturnStatus = = pdPASS )
if ( xReturnStatus = = pdPASS )
{
if ( xTaskCreate ( prvMQTTAgentTask ,
" MQTT Agent Task " ,
@ -1987,7 +2112,7 @@ static void vOtaDemoTask( void* pvParam )
/****************************** Start OTA Demo. ******************************/
if ( xReturnStatus = = pdPASS )
if ( xReturnStatus = = pdPASS )
{
/* Start OTA demo. The function returns only if OTA completes successfully and a
* shutdown of OTA is triggered for a manual restart of the device . */
@ -2014,7 +2139,7 @@ static void vOtaDemoTask( void* pvParam )
/*
* @ brief Create the task that demonstrates the Ota demo .
*/
void vStartOtaDemo ( void )
void vStartOtaDemo ( void )
{
/*
* vOtaDemoTask ( ) connects to the MQTT broker , creates the
@ -2022,10 +2147,10 @@ void vStartOtaDemo(void)
* which creates the OTA Agent task .
*/
xTaskCreate ( vOtaDemoTask , /* Function that implements the task. */
xTaskCreate ( vOtaDemoTask , /* Function that implements the task. */
" OTA Demo Task " , /* Text name for the task - only used for debugging. */
democonfigDEMO_STACKSIZE , /* Size of stack (in words, not bytes) to allocate for the task. */
NULL , /* Optional - task parameter - not used in this case. */
tskIDLE_PRIORITY + 1 , /* Task priority, must be between 0 and configMAX_PRIORITIES - 1. */
NULL ) ; /* Optional - used to pass out a handle to the created task. */
NULL ) ; /* Optional - used to pass out a handle to the created task. */
}