@ -142,7 +142,7 @@
* This demo program expects this key to be in the Job document . It is a key
* specific to this demo .
*/
# define jobsexampleQUERY_KEY_FOR_ACTION jobsexampleQUERY_KEY_FOR_JOBS_DOC ". action"
# define jobsexampleQUERY_KEY_FOR_ACTION "action"
/**
* @ brief The length of # jobsexampleQUERY_KEY_FOR_ACTION .
@ -157,7 +157,7 @@
* is either " publish " or " print " . It represents the message that should be
* published or printed , respectively .
*/
# define jobsexampleQUERY_KEY_FOR_MESSAGE jobsexampleQUERY_KEY_FOR_JOBS_DOC ". message"
# define jobsexampleQUERY_KEY_FOR_MESSAGE "message"
/**
* @ brief The length of # jobsexampleQUERY_KEY_FOR_MESSAGE .
@ -172,7 +172,7 @@
* is " publish " . It represents the MQTT topic on which the message should be
* published .
*/
# define jobsexampleQUERY_KEY_FOR_TOPIC jobsexampleQUERY_KEY_FOR_JOBS_DOC ". topic"
# define jobsexampleQUERY_KEY_FOR_TOPIC "topic"
/**
* @ brief The length of # jobsexampleQUERY_KEY_FOR_TOPIC .
@ -181,14 +181,14 @@
/**
* @ brief Utility macro to generate the PUBLISH topic string to the
* Describe Pending JobExecution API of AWS IoT Jobs service for requesting
* Describe JobExecution API of AWS IoT Jobs service for requesting
* the next pending job information .
*
* @ param [ in ] thingName The name of the Thing resource to query for the
* next pending job .
*/
# define START _NEXT_JOB_TOPIC( thingName ) \
( JOBS_API_PREFIX thingName JOBS_API_BRIDGE JOBS_API_ STARTNEXT )
# define DESCRIBE _NEXT_JOB_TOPIC( thingName ) \
( JOBS_API_PREFIX thingName JOBS_API_BRIDGE JOBS_API_ JOBID_NEXT " / " JOBS_API_GETPENDING )
/**
* @ brief Utility macro to generate the subscription topic string for the
@ -239,8 +239,8 @@ typedef enum JobActionType
/*-----------------------------------------------------------*/
/**
* @ brief Each compilation unit that consumes the NetworkContext must define it .
/**
* @ brief Each compilation unit that consumes the NetworkContext must define it .
* It should contain a single pointer to the type of your desired transport .
* When using multiple transports in the same compilation unit , define this pointer as void * .
*
@ -271,14 +271,29 @@ static TlsTransportParams_t xTlsTransportParams;
/**
* @ brief Static buffer used to hold MQTT messages being sent and received .
*/
static uint8_t ucSharedBuffer [ democonfigNETWORK_BUFFER_SIZE ] ;
static uint8_t usMqttConnectionBuffer [ democonfigNETWORK_BUFFER_SIZE ] ;
/**
* @ brief Static buffer used to hold the job ID of the single job that
* is executed at a time in the demo . This buffer allows re - use of the MQTT
* connection context for sending status updates of a job while it is being
* processed .
*/
static uint8_t usJobIdBuffer [ democonfigNETWORK_BUFFER_SIZE ] ;
/**
* @ brief Static buffer used to hold the job document of the single job that
* is executed at a time in the demo . This buffer allows re - use of the MQTT
* connection context for sending status updates of a job while it is being processed .
*/
static uint8_t usJobsDocumentBuffer [ democonfigNETWORK_BUFFER_SIZE ] ;
/**
* @ brief Static buffer used to hold MQTT messages being sent and received .
*/
static MQTTFixedBuffer_t xBuffer =
{
. pBuffer = ucSharedBuffer ,
. pBuffer = u sMqttConnection Buffer,
. size = democonfigNETWORK_BUFFER_SIZE
} ;
@ -325,12 +340,11 @@ static void prvEventCallback( MQTTContext_t * pxMqttContext,
MQTTDeserializedInfo_t * pxDeserializedInfo ) ;
/**
* @ brief Process payload from NextJobExecutionChanged and StartNextPending JobExecution
* @ brief Process payload from NextJobExecutionChanged and Describe JobExecution
* API MQTT topics of AWS IoT Jobs service .
*
* This handler parses the payload received about the next pending job to identify
* the action requested in the job document , and perform the appropriate
* action to execute the job .
* This handler parses the received payload about the next pending job , identifies
* the action requested in the job document , and executes the action .
*
* @ param [ in ] pPublishInfo Deserialized publish info pointer for the incoming
* packet .
@ -354,14 +368,16 @@ static void prvSendUpdateForJob( char * pcJobId,
* It parses the received job document , executes the job depending on the job " Action " type , and
* sends an update to AWS for the Job .
*
* @ param [ in ] pxPublishInfo The PUBLISH packet containing the job document received from the
* AWS IoT Jobs service .
* @ param [ in ] pcJobId The ID of the job to execute .
* @ param [ in ] usJobIdLength The length of the job ID string .
* @ param [ in ] pcJobDocument The JSON document associated with the @ a pcJobID job
* that is to be processed .
* @ param [ in ] usDocumentLength The length of the job document .
*/
static void prvProcessJobDocument ( MQTTPublishInfo_t * pxPublishInfo ,
char * pcJobId ,
uint16_t usJobIdLength ) ;
static void prvProcessJobDocument ( char * pcJobId ,
uint16_t usJobIdLength ,
char * pcJobDocument ,
uint16_t jobDocumentLength ) ;
/**
* @ brief The task used to demonstrate the Jobs library API .
@ -443,19 +459,22 @@ static void prvSendUpdateForJob( char * pcJobId,
}
}
static void prvProcessJobDocument ( MQTTPublishInfo_t * pxPublishInfo ,
char * pcJobId ,
uint16_t usJobIdLength )
static void prvProcessJobDocument ( char * pcJobId ,
uint16_t usJobIdLength ,
char * pcJobDocument ,
uint16_t jobDocumentLength )
{
char * pcAction = NULL ;
size_t uActionLength = 0U ;
JSONStatus_t xJsonStatus = JSONSuccess ;
configASSERT ( pxPublishInfo ! = NULL ) ;
configASSERT ( ( pxPublishInfo - > pPayload ! = NULL ) & & ( pxPublishInfo - > payloadLength > 0 ) ) ;
configASSERT ( pcJobId ! = NULL ) ;
configASSERT ( usJobIdLength > 0 ) ;
configASSERT ( pcJobDocument ! = NULL ) ;
configASSERT ( jobDocumentLength > 0 ) ;
xJsonStatus = JSON_Search ( ( char * ) pxPublishInfo - > pPayload ,
pxPublishInfo- > payload Length,
xJsonStatus = JSON_Search ( pcJobDocument ,
jobDocument Length,
jobsexampleQUERY_KEY_FOR_ACTION ,
jobsexampleQUERY_KEY_FOR_ACTION_LENGTH ,
& pcAction ,
@ -485,8 +504,8 @@ static void prvProcessJobDocument( MQTTPublishInfo_t * pxPublishInfo,
case JOB_ACTION_PRINT :
LogInfo ( ( " Received job contains \" print \" action. " ) ) ;
xJsonStatus = JSON_Search ( ( char * ) pxPublishInfo - > pPayload ,
pxPublishInfo- > payload Length,
xJsonStatus = JSON_Search ( pcJobDocument ,
jobDocument Length,
jobsexampleQUERY_KEY_FOR_MESSAGE ,
jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH ,
& pcMessage ,
@ -517,8 +536,8 @@ static void prvProcessJobDocument( MQTTPublishInfo_t * pxPublishInfo,
char * pcTopic = NULL ;
size_t ulTopicLength = 0U ;
xJsonStatus = JSON_Search ( ( char * ) pxPublishInfo - > pPayload ,
pxPublishInfo- > payload Length,
xJsonStatus = JSON_Search ( pcJobDocument ,
jobDocument Length,
jobsexampleQUERY_KEY_FOR_TOPIC ,
jobsexampleQUERY_KEY_FOR_TOPIC_LENGTH ,
& pcTopic ,
@ -532,8 +551,8 @@ static void prvProcessJobDocument( MQTTPublishInfo_t * pxPublishInfo,
}
else
{
xJsonStatus = JSON_Search ( ( char * ) pxPublishInfo - > pPayload ,
pxPublishInfo- > payload Length,
xJsonStatus = JSON_Search ( pcJobDocument ,
jobDocument Length,
jobsexampleQUERY_KEY_FOR_MESSAGE ,
jobsexampleQUERY_KEY_FOR_MESSAGE_LENGTH ,
& pcMessage ,
@ -590,7 +609,7 @@ static void prvNextJobHandler( MQTTPublishInfo_t * pxPublishInfo )
else
{
char * pcJobId = NULL ;
size_t ulJobIdLength = 0U ;
size_t ulJobIdLength = 0U L ;
/* Parse the Job ID of the next pending job execution from the JSON payload. */
if ( JSON_Search ( ( char * ) pxPublishInfo - > pPayload ,
@ -607,12 +626,43 @@ static void prvNextJobHandler( MQTTPublishInfo_t * pxPublishInfo )
}
else
{
char * pcJobDocLoc = NULL ;
size_t ulJobDocLength = 0UL ;
configASSERT ( ulJobIdLength < JOBS_JOBID_MAX_LENGTH ) ;
LogInfo ( ( " Received a Job from AWS IoT Jobs service: JobId=%.*s " ,
ulJobIdLength , pcJobId ) ) ;
/* Process the Job document and execute the job. */
prvProcessJobDocument ( pxPublishInfo , pcJobId , ( uint16_t ) ulJobIdLength ) ;
/* Copy the Job ID in the global buffer. This is done so that
* the MQTT context ' s network buffer can be used for sending jobs
* status updates to the AWS IoT Jobs service . */
memcpy ( usJobIdBuffer , pcJobId , ulJobIdLength ) ;
/* Search for the jobs document in the payload. */
if ( JSON_Search ( ( char * ) pxPublishInfo - > pPayload ,
pxPublishInfo - > payloadLength ,
jobsexampleQUERY_KEY_FOR_JOBS_DOC ,
jobsexampleQUERY_KEY_FOR_JOBS_DOC_LENGTH ,
& pcJobDocLoc ,
& ulJobDocLength ) ! = JSONSuccess )
{
LogWarn ( ( " Failed to parse document of next job received from AWS IoT Jobs service: "
" Topic=%.*s, JobID=%.*s " ,
pxPublishInfo - > topicNameLength , pxPublishInfo - > pTopicName ,
ulJobIdLength , pcJobId ) ) ;
}
else
{
/* Copy the Job document in buffer. This is done so that the MQTT connection buffer can
* be used for sending jobs status updates to the AWS IoT Jobs service . */
memcpy ( usJobsDocumentBuffer , pcJobDocLoc , ulJobDocLength ) ;
/* Process the Job document and execute the job. */
prvProcessJobDocument ( usJobIdBuffer ,
( uint16_t ) ulJobIdLength ,
usJobsDocumentBuffer ,
ulJobDocLength ) ;
}
}
}
}
@ -663,7 +713,7 @@ static void prvEventCallback( MQTTContext_t * pxMqttContext,
if ( xStatus = = JobsSuccess )
{
/* Upon successful return, the messageType has been filled in. */
if ( ( topicType = = Jobs StartNext Success ) | | ( topicType = = JobsNextJobChanged ) )
if ( ( topicType = = Jobs Describe Success ) | | ( topicType = = JobsNextJobChanged ) )
{
/* Handler function to process payload. */
prvNextJobHandler ( pxDeserializedInfo - > pPublishInfo ) ;
@ -748,10 +798,10 @@ void vStartJobsDemo( void )
*
* This function also shows that the communication with the AWS IoT Jobs services does
* not require explicit subscriptions to the response MQTT topics for request commands that
* sent to the MQTT APIs ( like StartNextPending JobExecution API ) of the service . The service
* sent to the MQTT APIs ( like Describe JobExecution API ) of the service . The service
* will send messages on the response topics for the request commands on the same
* MQTT connection irrespective of whether the client subscribes to the response topics .
* Therefore , this demo processes incoming messages from response topics of StartNextPending JobExecution
* Therefore , this demo processes incoming messages from response topics of Describe JobExecution
* and UpdateJobExecution APIs without explicitly subscribing to the topics .
*/
void prvJobsDemoTask ( void * pvParameters )
@ -829,23 +879,23 @@ void prvJobsDemoTask( void * pvParameters )
if ( xDemoStatus = = pdPASS )
{
/* Publish to AWS IoT Jobs on the StartNextPending JobExecution API to request the next pending job.
/* Publish to AWS IoT Jobs on the Describe JobExecution API to request the next pending job.
*
* Note : It is not required to make MQTT subscriptions to the response topics of the
* StartNextPending JobExecution API because the AWS IoT Jobs service sends responses for
* Describe JobExecution API because the AWS IoT Jobs service sends responses for
* the PUBLISH commands on the same MQTT connection irrespective of whether the client has subscribed
* to the response topics or not .
* This demo processes incoming messages from the response topics of the API in the prvEventCallback ( )
* handler that is supplied to the coreMQTT library . */
if ( xPublishToTopic ( & xMqttContext ,
START _NEXT_JOB_TOPIC( democonfigTHING_NAME ) ,
sizeof ( START _NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) - 1 ,
DESCRIBE _NEXT_JOB_TOPIC( democonfigTHING_NAME ) ,
sizeof ( DESCRIBE _NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) - 1 ,
NULL ,
0 ) ! = pdPASS )
{
xDemoStatus = pdFAIL ;
LogError ( ( " Failed to publish to StartNextPending JobExecution API of AWS IoT Jobs service: "
" Topic=%s " , START _NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) ) ;
LogError ( ( " Failed to publish to Describe JobExecution API of AWS IoT Jobs service: "
" Topic=%s " , DESCRIBE _NEXT_JOB_TOPIC( democonfigTHING_NAME ) ) ) ;
}
}