From a2a006b53b90b71c5b2d2f1c58e8f6928680034c Mon Sep 17 00:00:00 2001
From: sorie <gsoriee@gmail.com>
Date: Sun, 10 Jul 2022 13:59:52 +0800
Subject: [PATCH] =?UTF-8?q?RocketMQ=20=E6=94=AF=E6=8C=81PollMessageSource?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../RocketMQPollableConsumeApplication.java   | 12 ++++++++---
 ...gHandlerMappingsProviderConfiguration.java |  7 ++++---
 .../convert/RocketMQMessageConverter.java     | 21 ++++++++++++-------
 3 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java
index 99a44881b..fb1ac9975 100644
--- a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java
@@ -55,9 +55,12 @@ public class RocketMQPollableConsumeApplication {
 	}
 
 	public interface PolledConsumer {
+
 		@Input("pollable-in-0")
 		PollableMessageSource pollable();
+
 	}
+
 	@Bean
 	public ApplicationRunner producer() {
 		return args -> {
@@ -78,15 +81,18 @@ public class RocketMQPollableConsumeApplication {
 			while (true) {
 				try {
 					if (!destIn.poll((m) -> {
-						SimpleMsg newPayload = (SimpleMsg)m.getPayload();
+						SimpleMsg newPayload = (SimpleMsg) m.getPayload();
 						System.out.println(newPayload.getMsg());
-					}, new ParameterizedTypeReference<SimpleMsg>() {})) {
+					}, new ParameterizedTypeReference<SimpleMsg>() {
+					})) {
 						Thread.sleep(1000);
 					}
-				} catch (Exception e) {
+				}
+				catch (Exception e) {
 					// handle failure
 				}
 			}
 		};
 	}
+
 }
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
index 0cbdc543d..f42c6bcb9 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
@@ -65,9 +65,10 @@ public class ExtendedBindingHandlerMappingsProviderConfiguration {
 	}
 
 	/**
-	 * Register message converter to adapte Spring Cloud Stream.
-	 * Refer to https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters .
-	 * @return
+	 * Register message converter to adapte Spring Cloud Stream. Refer to
+	 * https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters
+	 * .
+	 * @return message converter
 	 */
 	@Bean
 	public MessageConverter rocketMQCustomMessageConverter() {
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
index 76a2ae26e..f16cf7a7c 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
@@ -20,7 +20,12 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.springframework.messaging.Message;
-import org.springframework.messaging.converter.*;
+import org.springframework.messaging.converter.AbstractMessageConverter;
+import org.springframework.messaging.converter.ByteArrayMessageConverter;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+import org.springframework.messaging.converter.StringMessageConverter;
 import org.springframework.util.ClassUtils;
 
 /**
@@ -95,22 +100,25 @@ public class RocketMQMessageConverter extends AbstractMessageConverter {
 	}
 
 	/**
-	 * Convert the message payload from serialized form to an Object by RocketMQMessageConverter.
+	 * Convert the message payload from serialized form to an Object by
+	 * RocketMQMessageConverter.
 	 * @param message the input message
 	 * @param targetClass the target class for the conversion
-	 * @param conversionHint an extra object passed to the {@link MessageConverter},
-	 * e.g. the associated {@code MethodParameter} (may be {@code null}}
+	 * @param conversionHint an extra object passed to the {@link MessageConverter}, e.g.
+	 * the associated {@code MethodParameter} (may be {@code null}}
 	 * @return the result of the conversion, or {@code null} if the converter cannot
 	 * perform the conversion
 	 * @since 4.2
 	 */
 	@Override
-	protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
+	protected Object convertFromInternal(Message<?> message, Class<?> targetClass,
+			Object conversionHint) {
 		Object payload = null;
 		for (MessageConverter converter : getMessageConverter().getConverters()) {
 			try {
 				payload = converter.fromMessage(message, targetClass);
-			} catch (Exception ignore) {
+			}
+			catch (Exception ignore) {
 			}
 			if (payload != null) {
 				return payload;
@@ -122,5 +130,4 @@ public class RocketMQMessageConverter extends AbstractMessageConverter {
 		return payload;
 	}
 
-
 }