diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml
index 3c7a9d80e..07edc158e 100644
--- a/spring-cloud-alibaba-examples/pom.xml
+++ b/spring-cloud-alibaba-examples/pom.xml
@@ -44,6 +44,7 @@
rocketmq-example/rocketmq-sql-consume-example
rocketmq-example/rocketmq-example-common
rocketmq-example/rocketmq-tx-example
+ rocketmq-example/rocketmq-pollable-consume-example
rocketmq-example/rocketmq-retrieable-consume-example
spring-cloud-bus-rocketmq-example
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/pom.xml
new file mode 100644
index 000000000..d4f2d1cef
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/pom.xml
@@ -0,0 +1,57 @@
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-alibaba-examples
+ ${revision}
+ ../../pom.xml
+
+ 4.0.0
+
+
+ rocketmq-pollable-consume-example
+ Spring Cloud Starter Stream Alibaba RocketMQ PollableMessageSource Consume Example
+ Example demonstrating how to use rocketmq to produce, and consume by PollableMessageSource
+ jar
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-stream-rocketmq
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-json
+
+
+ com.alibaba.cloud
+ rocketmq-example-common
+ ${revision}
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ ${maven-deploy-plugin.version}
+
+ true
+
+
+
+
+
+
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java
new file mode 100644
index 000000000..bb86f401e
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.orderly;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.alibaba.cloud.examples.common.SimpleMsg;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.binder.PollableMessageSource;
+import org.springframework.cloud.stream.function.StreamBridge;
+import org.springframework.context.annotation.Bean;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQPollableConsumeApplication {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(RocketMQPollableConsumeApplication.class);
+
+ @Autowired
+ private StreamBridge streamBridge;
+
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMQPollableConsumeApplication.class, args);
+ }
+
+ @Bean
+ public ApplicationRunner producer() {
+ return args -> {
+ for (int i = 0; i < 100; i++) {
+ String key = "KEY" + i;
+ Map headers = new HashMap<>();
+ headers.put(MessageConst.PROPERTY_KEYS, key);
+ Message msg = new GenericMessage(
+ new SimpleMsg("Hello RocketMQ " + i), headers);
+ streamBridge.send("producer-out-0", msg);
+ }
+ };
+ }
+
+ @Bean
+ public ApplicationRunner pollable(PollableMessageSource destIn) {
+ return args -> {
+ while (true) {
+ try {
+ if (!destIn.poll((m) -> {
+ SimpleMsg newPayload = (SimpleMsg)m.getPayload();
+ System.out.println(newPayload.toString());
+ }, new ParameterizedTypeReference() {})) {
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ // handle failure
+ }
+ }
+ };
+ }
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml
new file mode 100644
index 000000000..6194c870e
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml
@@ -0,0 +1,25 @@
+server:
+ port: 28089
+spring:
+ application:
+ name: rocketmq-delay-consume-example
+ cloud:
+ stream:
+ pollable-source: pollable
+ rocketmq:
+ binder:
+ name-server: localhost:9876
+ bindings:
+ producer-out-0:
+ producer:
+ group: output_1
+ bindings:
+ producer-out-0:
+ destination: pollable
+ pollable-in-0:
+ destination: pollable
+ group: pollable-group
+
+logging:
+ level:
+ org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
index 30f6fdc56..301df0d6f 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
@@ -28,6 +28,7 @@ import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProv
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
@Configuration
public class ExtendedBindingHandlerMappingsProviderConfiguration {
@@ -63,4 +64,13 @@ public class ExtendedBindingHandlerMappingsProviderConfiguration {
return new RocketMQMessageConverter().getMessageConverter();
}
+ /**
+ * Register message converter to adapte Spring Cloud Stream.
+ * Refer to https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters .
+ * @return
+ */
+ public MessageConverter rocketMQCustomMessageConverter() {
+ return new RocketMQMessageConverter();
+ }
+
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
index f69290997..76a2ae26e 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
@@ -19,11 +19,8 @@ package com.alibaba.cloud.stream.binder.rocketmq.convert;
import java.util.ArrayList;
import java.util.List;
-import org.springframework.messaging.converter.ByteArrayMessageConverter;
-import org.springframework.messaging.converter.CompositeMessageConverter;
-import org.springframework.messaging.converter.MappingJackson2MessageConverter;
-import org.springframework.messaging.converter.MessageConverter;
-import org.springframework.messaging.converter.StringMessageConverter;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.converter.*;
import org.springframework.util.ClassUtils;
/**
@@ -31,7 +28,7 @@ import org.springframework.util.ClassUtils;
*
* @author zkzlx
*/
-public class RocketMQMessageConverter {
+public class RocketMQMessageConverter extends AbstractMessageConverter {
/**
* if you want to customize a bean, please use the BeanName.
@@ -87,4 +84,43 @@ public class RocketMQMessageConverter {
this.messageConverter = messageConverter;
}
+ /**
+ * support all classes.
+ * @param clazz classes.
+ * @return awayls true.
+ */
+ @Override
+ protected boolean supports(Class> clazz) {
+ return true;
+ }
+
+ /**
+ * Convert the message payload from serialized form to an Object by RocketMQMessageConverter.
+ * @param message the input message
+ * @param targetClass the target class for the conversion
+ * @param conversionHint an extra object passed to the {@link MessageConverter},
+ * e.g. the associated {@code MethodParameter} (may be {@code null}}
+ * @return the result of the conversion, or {@code null} if the converter cannot
+ * perform the conversion
+ * @since 4.2
+ */
+ @Override
+ protected Object convertFromInternal(Message> message, Class> targetClass, Object conversionHint) {
+ Object payload = null;
+ for (MessageConverter converter : getMessageConverter().getConverters()) {
+ try {
+ payload = converter.fromMessage(message, targetClass);
+ } catch (Exception ignore) {
+ }
+ if (payload != null) {
+ return payload;
+ }
+ }
+ if (payload == null && logger.isDebugEnabled()) {
+ logger.debug("Can convert message " + message.toString());
+ }
+ return payload;
+ }
+
+
}