diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java index bb86f401e..99a44881b 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java @@ -28,6 +28,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.binder.PollableMessageSource; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; @@ -39,6 +41,7 @@ import org.springframework.messaging.support.GenericMessage; * @author sorie */ @SpringBootApplication +@EnableBinding(RocketMQPollableConsumeApplication.PolledConsumer.class) public class RocketMQPollableConsumeApplication { private static final Logger log = LoggerFactory @@ -51,6 +54,10 @@ public class RocketMQPollableConsumeApplication { SpringApplication.run(RocketMQPollableConsumeApplication.class, args); } + public interface PolledConsumer { + @Input("pollable-in-0") + PollableMessageSource pollable(); + } @Bean public ApplicationRunner producer() { return args -> { @@ -66,13 +73,13 @@ public class RocketMQPollableConsumeApplication { } @Bean - public ApplicationRunner pollable(PollableMessageSource destIn) { + public ApplicationRunner consumer(PollableMessageSource destIn) { return args -> { while (true) { try { if (!destIn.poll((m) -> { SimpleMsg newPayload = (SimpleMsg)m.getPayload(); - System.out.println(newPayload.toString()); + System.out.println(newPayload.getMsg()); }, new ParameterizedTypeReference() {})) { Thread.sleep(1000); } diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml index 6194c870e..7447538b3 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml @@ -5,7 +5,6 @@ spring: name: rocketmq-delay-consume-example cloud: stream: - pollable-source: pollable rocketmq: binder: name-server: localhost:9876 diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java index 301df0d6f..0cbdc543d 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java @@ -69,6 +69,7 @@ public class ExtendedBindingHandlerMappingsProviderConfiguration { * Refer to https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters . * @return */ + @Bean public MessageConverter rocketMQCustomMessageConverter() { return new RocketMQMessageConverter(); }