RocketMQ 支持PollMessageSource

pull/2769/head
sorie 3 years ago committed by Steve Rao
parent 35d5fa6e9a
commit b8bcca486f

@ -28,6 +28,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.binder.PollableMessageSource; import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
@ -39,6 +41,7 @@ import org.springframework.messaging.support.GenericMessage;
* @author sorie * @author sorie
*/ */
@SpringBootApplication @SpringBootApplication
@EnableBinding(RocketMQPollableConsumeApplication.PolledConsumer.class)
public class RocketMQPollableConsumeApplication { public class RocketMQPollableConsumeApplication {
private static final Logger log = LoggerFactory private static final Logger log = LoggerFactory
@ -51,6 +54,10 @@ public class RocketMQPollableConsumeApplication {
SpringApplication.run(RocketMQPollableConsumeApplication.class, args); SpringApplication.run(RocketMQPollableConsumeApplication.class, args);
} }
public interface PolledConsumer {
@Input("pollable-in-0")
PollableMessageSource pollable();
}
@Bean @Bean
public ApplicationRunner producer() { public ApplicationRunner producer() {
return args -> { return args -> {
@ -66,13 +73,13 @@ public class RocketMQPollableConsumeApplication {
} }
@Bean @Bean
public ApplicationRunner pollable(PollableMessageSource destIn) { public ApplicationRunner consumer(PollableMessageSource destIn) {
return args -> { return args -> {
while (true) { while (true) {
try { try {
if (!destIn.poll((m) -> { if (!destIn.poll((m) -> {
SimpleMsg newPayload = (SimpleMsg)m.getPayload(); SimpleMsg newPayload = (SimpleMsg)m.getPayload();
System.out.println(newPayload.toString()); System.out.println(newPayload.getMsg());
}, new ParameterizedTypeReference<SimpleMsg>() {})) { }, new ParameterizedTypeReference<SimpleMsg>() {})) {
Thread.sleep(1000); Thread.sleep(1000);
} }

@ -5,7 +5,6 @@ spring:
name: rocketmq-delay-consume-example name: rocketmq-delay-consume-example
cloud: cloud:
stream: stream:
pollable-source: pollable
rocketmq: rocketmq:
binder: binder:
name-server: localhost:9876 name-server: localhost:9876

@ -69,6 +69,7 @@ public class ExtendedBindingHandlerMappingsProviderConfiguration {
* Refer to https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters . * Refer to https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters .
* @return * @return
*/ */
@Bean
public MessageConverter rocketMQCustomMessageConverter() { public MessageConverter rocketMQCustomMessageConverter() {
return new RocketMQMessageConverter(); return new RocketMQMessageConverter();
} }

Loading…
Cancel
Save