@ -555,30 +555,13 @@ public class RocketMQDelayConsumeApplication {
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
// Set the delay level 1~10
headers.put("DELAY" , 2);
headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL , 2);
Message< SimpleMsg > msg = new GenericMessage(new SimpleMsg("Delay RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public ApplicationRunner producerSchedule() {
return args -> {
for (int i = 0; i < 100 ; i + + ) {
String key = "KEY" + i;
Map< String , Object > headers = new HashMap< >();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
// Sending scheduled message, you need to set the delay time in milliseconds (ms). The message will be delivered after the specified delay time, for example, the message will be delivered after 3 seconds.
long delayTime = System.currentTimeMillis() + 3000;
headers.put(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, delayTime);
Message< SimpleMsg > msg = new GenericMessage(new SimpleMsg("Schedule RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
@Bean
public Consumer< Message < SimpleMsg > > consumer() {
return msg -> {