diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java index 99a44881b..fb1ac9975 100644 --- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java +++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java @@ -55,9 +55,12 @@ public class RocketMQPollableConsumeApplication { } public interface PolledConsumer { + @Input("pollable-in-0") PollableMessageSource pollable(); + } + @Bean public ApplicationRunner producer() { return args -> { @@ -78,15 +81,18 @@ public class RocketMQPollableConsumeApplication { while (true) { try { if (!destIn.poll((m) -> { - SimpleMsg newPayload = (SimpleMsg)m.getPayload(); + SimpleMsg newPayload = (SimpleMsg) m.getPayload(); System.out.println(newPayload.getMsg()); - }, new ParameterizedTypeReference() {})) { + }, new ParameterizedTypeReference() { + })) { Thread.sleep(1000); } - } catch (Exception e) { + } + catch (Exception e) { // handle failure } } }; } + } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java index 0cbdc543d..f42c6bcb9 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java @@ -65,9 +65,10 @@ public class ExtendedBindingHandlerMappingsProviderConfiguration { } /** - * Register message converter to adapte Spring Cloud Stream. - * Refer to https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters . - * @return + * Register message converter to adapte Spring Cloud Stream. Refer to + * https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters + * . + * @return message converter */ @Bean public MessageConverter rocketMQCustomMessageConverter() { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java index 76a2ae26e..f16cf7a7c 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java @@ -20,7 +20,12 @@ import java.util.ArrayList; import java.util.List; import org.springframework.messaging.Message; -import org.springframework.messaging.converter.*; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.messaging.converter.ByteArrayMessageConverter; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.util.ClassUtils; /** @@ -95,22 +100,25 @@ public class RocketMQMessageConverter extends AbstractMessageConverter { } /** - * Convert the message payload from serialized form to an Object by RocketMQMessageConverter. + * Convert the message payload from serialized form to an Object by + * RocketMQMessageConverter. * @param message the input message * @param targetClass the target class for the conversion - * @param conversionHint an extra object passed to the {@link MessageConverter}, - * e.g. the associated {@code MethodParameter} (may be {@code null}} + * @param conversionHint an extra object passed to the {@link MessageConverter}, e.g. + * the associated {@code MethodParameter} (may be {@code null}} * @return the result of the conversion, or {@code null} if the converter cannot * perform the conversion * @since 4.2 */ @Override - protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { + protected Object convertFromInternal(Message message, Class targetClass, + Object conversionHint) { Object payload = null; for (MessageConverter converter : getMessageConverter().getConverters()) { try { payload = converter.fromMessage(message, targetClass); - } catch (Exception ignore) { + } + catch (Exception ignore) { } if (payload != null) { return payload; @@ -122,5 +130,4 @@ public class RocketMQMessageConverter extends AbstractMessageConverter { return payload; } - }