diff --git a/.github/workflows/integration-test.yml b/.github/workflows/integration-test.yml
index 767d0cd2b..225a7c6e1 100644
--- a/.github/workflows/integration-test.yml
+++ b/.github/workflows/integration-test.yml
@@ -39,7 +39,9 @@ jobs:
${{ runner.os }}-maven-
- name: Compile & Checkstyle
run: mvn clean compile
+ - name: install dependencies
+ run: mvn clean install -U package -pl '!spring-cloud-alibaba-coverage' -DskipTests
- name: Testing
- run: mvn clean test
+ run: ./mvnw verify -B -Dmaven.test.skip=false
# run: mvn clean -Dit.enabled=true test
diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml
index d0b90f521..cd9dc556a 100644
--- a/spring-cloud-alibaba-examples/pom.xml
+++ b/spring-cloud-alibaba-examples/pom.xml
@@ -41,6 +41,7 @@
rocketmq-example/rocketmq-sql-consume-example
rocketmq-example/rocketmq-example-common
rocketmq-example/rocketmq-tx-example
+ rocketmq-example/rocketmq-pollable-consume-example
spring-cloud-bus-rocketmq-example
spring-cloud-alibaba-sidecar-examples/spring-cloud-alibaba-sidecar-nacos-example
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/pom.xml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/pom.xml
new file mode 100644
index 000000000..d4f2d1cef
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/pom.xml
@@ -0,0 +1,57 @@
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-alibaba-examples
+ ${revision}
+ ../../pom.xml
+
+ 4.0.0
+
+
+ rocketmq-pollable-consume-example
+ Spring Cloud Starter Stream Alibaba RocketMQ PollableMessageSource Consume Example
+ Example demonstrating how to use rocketmq to produce, and consume by PollableMessageSource
+ jar
+
+
+
+
+ com.alibaba.cloud
+ spring-cloud-starter-stream-rocketmq
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+ org.springframework.boot
+ spring-boot-starter-json
+
+
+ com.alibaba.cloud
+ rocketmq-example-common
+ ${revision}
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ ${maven-deploy-plugin.version}
+
+ true
+
+
+
+
+
+
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java
new file mode 100644
index 000000000..59445f009
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/java/com/alibaba/cloud/examples/pollable/RocketMQPollableConsumeApplication.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2013-2022 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.examples.orderly;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.alibaba.cloud.examples.common.SimpleMsg;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.stream.binder.PollableMessageSource;
+import org.springframework.cloud.stream.function.StreamBridge;
+import org.springframework.context.annotation.Bean;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
+
+/**
+ * @author sorie
+ */
+@SpringBootApplication
+public class RocketMQPollableConsumeApplication {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(RocketMQPollableConsumeApplication.class);
+
+ @Autowired
+ private StreamBridge streamBridge;
+
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMQPollableConsumeApplication.class, args);
+ }
+
+ @Bean
+ public ApplicationRunner producer() {
+ return args -> {
+ for (int i = 0; i < 100; i++) {
+ String key = "KEY" + i;
+ Map headers = new HashMap<>();
+ headers.put(MessageConst.PROPERTY_KEYS, key);
+ Message msg = new GenericMessage(
+ new SimpleMsg("Hello RocketMQ " + i), headers);
+ streamBridge.send("producer-out-0", msg);
+ }
+ };
+ }
+
+ @Bean
+ public ApplicationRunner pollableRunner(PollableMessageSource destIn) {
+ return args -> {
+ while (true) {
+ try {
+ if (!destIn.poll((m) -> {
+ SimpleMsg newPayload = (SimpleMsg) m.getPayload();
+ System.out.println(newPayload.getMsg());
+ }, new ParameterizedTypeReference() { })) {
+ Thread.sleep(1000);
+ }
+ }
+ catch (Exception e) {
+ // handle failure
+ }
+ }
+ };
+ }
+}
diff --git a/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml
new file mode 100644
index 000000000..93523f509
--- /dev/null
+++ b/spring-cloud-alibaba-examples/rocketmq-example/rocketmq-pollable-consume-example/src/main/resources/application.yml
@@ -0,0 +1,23 @@
+server:
+ port: 28089
+spring:
+ application:
+ name: rocketmq-pollable-consume-example
+ cloud:
+ stream:
+ pollable-source: pollable
+ rocketmq:
+ binder:
+ name-server: localhost:9876
+ bindings:
+ producer-out-0:
+ producer:
+ group: output_1
+ bindings:
+ producer-out-0:
+ destination: pollable
+ pollable-in-0:
+ destination: pollable
+logging:
+ level:
+ org.springframework.context.support: debug
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosDiscoveryProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosDiscoveryProperties.java
index ef6862d9f..0b789acf7 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosDiscoveryProperties.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/NacosDiscoveryProperties.java
@@ -17,6 +17,7 @@
package com.alibaba.cloud.nacos;
import java.net.Inet4Address;
+import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Enumeration;
@@ -31,6 +32,7 @@ import javax.annotation.PostConstruct;
import com.alibaba.cloud.commons.lang.StringUtils;
import com.alibaba.cloud.nacos.event.NacosDiscoveryInfoChangedEvent;
+import com.alibaba.cloud.nacos.utils.InetIPv6Util;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
@@ -162,6 +164,11 @@ public class NacosDiscoveryProperties {
*/
private String networkInterface = "";
+ /**
+ * choose IPv4 or IPv6,if you don't set it will choose IPv4.
+ */
+ private String ipType = "IPv4";
+
/**
* The port your want to register for your service instance, needn't to set it if the
* auto detect port works well.
@@ -220,6 +227,9 @@ public class NacosDiscoveryProperties {
*/
private boolean failFast = true;
+ @Autowired
+ private InetIPv6Util inetIPv6Util;
+
@Autowired
private InetUtils inetUtils;
@@ -251,7 +261,16 @@ public class NacosDiscoveryProperties {
if (StringUtils.isEmpty(ip)) {
// traversing network interfaces if didn't specify a interface
if (StringUtils.isEmpty(networkInterface)) {
- ip = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
+ if ("IPv4".equalsIgnoreCase(ipType)) {
+ ip = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
+ }
+ else if ("IPv6".equalsIgnoreCase(ipType)) {
+ ip = inetIPv6Util.findIPv6Address();
+ }
+ else {
+ throw new IllegalArgumentException(
+ "please checking the type of IP " + ipType);
+ }
}
else {
NetworkInterface netInterface = NetworkInterface
@@ -265,6 +284,7 @@ public class NacosDiscoveryProperties {
while (inetAddress.hasMoreElements()) {
InetAddress currentAddress = inetAddress.nextElement();
if (currentAddress instanceof Inet4Address
+ || currentAddress instanceof Inet6Address
&& !currentAddress.isLoopbackAddress()) {
ip = currentAddress.getHostAddress();
break;
@@ -364,6 +384,14 @@ public class NacosDiscoveryProperties {
this.ip = ip;
}
+ public String getIpType() {
+ return ipType;
+ }
+
+ public void setIpType(String ipType) {
+ this.ipType = ipType;
+ }
+
public String getNetworkInterface() {
return networkInterface;
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/utils/InetIPv6Util.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/utils/InetIPv6Util.java
new file mode 100644
index 000000000..12fa32a22
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/utils/InetIPv6Util.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.nacos.utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.UnknownHostException;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.springframework.cloud.commons.util.InetUtils;
+import org.springframework.cloud.commons.util.InetUtilsProperties;
+
+/**
+ * @author HH
+ */
+public class InetIPv6Util implements Closeable {
+
+ private final ExecutorService executorService;
+
+ private final Log log = LogFactory.getLog(InetIPv6Util.class);
+
+ private final InetUtilsProperties properties;
+
+ @Override
+ public void close() {
+ this.executorService.shutdown();
+ }
+
+ public InetIPv6Util(final InetUtilsProperties properties) {
+ this.properties = properties;
+ this.executorService = Executors.newSingleThreadExecutor((r) -> {
+ Thread thread = new Thread(r);
+ thread.setName("spring.cloud.alibaba.inetIPV6Util");
+ thread.setDaemon(true);
+ return thread;
+ });
+ }
+
+ public InetUtils.HostInfo findFirstNonLoopbackHostInfo() {
+ InetAddress address = this.findFirstNonLoopbackIPv6Address();
+ if (address != null) {
+ return this.convertAddress(address);
+ }
+ else {
+ InetUtils.HostInfo hostInfo = new InetUtils.HostInfo();
+ this.properties.setDefaultIpAddress("0:0:0:0:0:0:0:1");
+ hostInfo.setHostname(this.properties.getDefaultHostname());
+ hostInfo.setIpAddress(this.properties.getDefaultIpAddress());
+ return hostInfo;
+ }
+ }
+
+ public InetAddress findFirstNonLoopbackIPv6Address() {
+ InetAddress address = null;
+
+ try {
+ int lowest = Integer.MAX_VALUE;
+ for (Enumeration nics = NetworkInterface
+ .getNetworkInterfaces(); nics.hasMoreElements(); ) {
+ NetworkInterface ifc = nics.nextElement();
+ if (ifc.isUp()) {
+ log.trace("Testing interface:" + ifc.getDisplayName());
+ if (ifc.getIndex() < lowest || address == null) {
+ lowest = ifc.getIndex();
+ }
+ else if (address != null) {
+ continue;
+ }
+
+ if (!ignoreInterface(ifc.getDisplayName())) {
+ for (Enumeration addrs = ifc
+ .getInetAddresses(); addrs.hasMoreElements(); ) {
+ InetAddress inetAddress = addrs.nextElement();
+ if (inetAddress instanceof Inet6Address
+ && !inetAddress.isLoopbackAddress()
+ && isPreferredAddress(inetAddress)) {
+ log.trace("Found non-loopback interface: "
+ + ifc.getDisplayName());
+ address = inetAddress;
+ }
+ }
+ }
+ }
+ }
+ }
+ catch (IOException e) {
+ log.error("Cannot get first non-loopback address", e);
+ }
+
+ if (address != null) {
+ return address;
+ }
+
+ try {
+ return InetAddress.getLocalHost();
+ }
+ catch (UnknownHostException e) {
+ log.warn("Unable to retrieve localhost");
+ }
+
+ return null;
+ }
+
+ public String findIPv6Address() {
+ String ip = findFirstNonLoopbackHostInfo().getIpAddress();
+ int index = ip.indexOf('%');
+ ip = index > 0 ? ip.substring(0, index) : ip;
+ return iPv6Format(ip);
+ }
+
+ public String iPv6Format(String ip) {
+ return "[" + ip + "]";
+ }
+
+ boolean isPreferredAddress(InetAddress address) {
+ if (this.properties.isUseOnlySiteLocalInterfaces()) {
+ final boolean siteLocalAddress = address.isSiteLocalAddress();
+ if (!siteLocalAddress) {
+ log.trace("Ignoring address" + address.getHostAddress());
+ }
+ return siteLocalAddress;
+ }
+ final List preferredNetworks = this.properties.getPreferredNetworks();
+ if (preferredNetworks.isEmpty()) {
+ return true;
+ }
+ for (String regex : preferredNetworks) {
+ final String hostAddress = address.getHostAddress();
+ if (hostAddress.matches(regex) || hostAddress.startsWith(regex)) {
+ return true;
+ }
+ }
+ log.trace("Ignoring address: " + address.getHostAddress());
+ return false;
+ }
+
+ boolean ignoreInterface(String interfaceName) {
+ for (String regex : this.properties.getIgnoredInterfaces()) {
+ if (interfaceName.matches(regex)) {
+ log.trace("Ignoring interface: " + interfaceName);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public InetUtils.HostInfo convertAddress(final InetAddress address) {
+ InetUtils.HostInfo hostInfo = new InetUtils.HostInfo();
+ Future result = this.executorService.submit(address::getHostName);
+
+ String hostname;
+ try {
+ hostname = result.get(this.properties.getTimeoutSeconds(), TimeUnit.SECONDS);
+ }
+ catch (Exception e) {
+ log.info("Cannot determine local hostname");
+ hostname = "localhost";
+ }
+ hostInfo.setHostname(hostname);
+ hostInfo.setIpAddress(address.getHostAddress());
+ return hostInfo;
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/utils/UtilIPv6AutoConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/utils/UtilIPv6AutoConfiguration.java
new file mode 100644
index 000000000..b914fb912
--- /dev/null
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/java/com/alibaba/cloud/nacos/utils/UtilIPv6AutoConfiguration.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2013-2018 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.cloud.nacos.utils;
+
+import com.alibaba.cloud.nacos.ConditionalOnNacosDiscoveryEnabled;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.cloud.client.ConditionalOnDiscoveryEnabled;
+import org.springframework.cloud.commons.util.InetUtilsProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author HH
+ */
+@Configuration(proxyBeanMethods = false)
+@ConditionalOnDiscoveryEnabled
+@ConditionalOnNacosDiscoveryEnabled
+public class UtilIPv6AutoConfiguration {
+
+ public UtilIPv6AutoConfiguration() {
+ }
+
+ @Bean
+ @ConditionalOnMissingBean
+ public InetIPv6Util inetIPv6Util(InetUtilsProperties properties) {
+ return new InetIPv6Util(properties);
+ }
+
+}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/resources/META-INF/spring.factories b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/resources/META-INF/spring.factories
index c12ffb14a..eec50df39 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/resources/META-INF/spring.factories
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/main/resources/META-INF/spring.factories
@@ -6,7 +6,8 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
com.alibaba.cloud.nacos.loadbalancer.LoadBalancerNacosAutoConfiguration,\
- com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
+ com.alibaba.cloud.nacos.NacosServiceAutoConfiguration,\
+ com.alibaba.cloud.nacos.utils.UtilIPv6AutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
org.springframework.context.ApplicationListener=\
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryAutoConfigurationTests.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryAutoConfigurationTests.java
index 73d0266ce..d4bc6c454 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryAutoConfigurationTests.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryAutoConfigurationTests.java
@@ -18,6 +18,7 @@ package com.alibaba.cloud.nacos.discovery;
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceAutoConfiguration;
+import com.alibaba.cloud.nacos.utils.UtilIPv6AutoConfiguration;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
@@ -33,6 +34,7 @@ public class NacosDiscoveryAutoConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(UtilAutoConfiguration.class,
+ UtilIPv6AutoConfiguration.class,
NacosServiceAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class));
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfigurationTest.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfigurationTest.java
index b66572905..26dfe5ff9 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfigurationTest.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/NacosDiscoveryClientConfigurationTest.java
@@ -18,6 +18,7 @@ package com.alibaba.cloud.nacos.discovery;
import com.alibaba.cloud.nacos.NacosServiceAutoConfiguration;
import com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration;
+import com.alibaba.cloud.nacos.utils.UtilIPv6AutoConfiguration;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
@@ -41,6 +42,7 @@ public class NacosDiscoveryClientConfigurationTest {
AutoServiceRegistrationConfiguration.class,
NacosServiceRegistryAutoConfiguration.class,
UtilAutoConfiguration.class,
+ UtilIPv6AutoConfiguration.class,
NacosServiceAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class,
NacosDiscoveryClientConfiguration.class, this.getClass()));
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/reactive/NacosReactiveDiscoveryClientConfigurationTests.java b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/reactive/NacosReactiveDiscoveryClientConfigurationTests.java
index 519f94977..88989888e 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/reactive/NacosReactiveDiscoveryClientConfigurationTests.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-alibaba-nacos-discovery/src/test/java/com/alibaba/cloud/nacos/discovery/reactive/NacosReactiveDiscoveryClientConfigurationTests.java
@@ -18,6 +18,7 @@ package com.alibaba.cloud.nacos.discovery.reactive;
import com.alibaba.cloud.nacos.NacosServiceAutoConfiguration;
import com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration;
+import com.alibaba.cloud.nacos.utils.UtilIPv6AutoConfiguration;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations;
@@ -34,6 +35,7 @@ public class NacosReactiveDiscoveryClientConfigurationTests {
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(UtilAutoConfiguration.class,
+ UtilIPv6AutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class,
NacosServiceAutoConfiguration.class,
NacosReactiveDiscoveryClientConfiguration.class));
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
index 4b669dd99..f83eac76f 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -16,7 +16,6 @@
package com.alibaba.cloud.stream.binder.rocketmq;
-import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter;
@@ -125,7 +124,7 @@ public class RocketMQMessageChannelBinder extends
throw new RuntimeException(
"group must be configured for DLQ" + destination.getName());
}
- group = anonymous ? anonymousGroup(destination.getName()) : group;
+ group = anonymous ? RocketMQUtils.anonymousGroup(destination.getName()) : group;
RocketMQUtils.mergeRocketMQProperties(binderConfigurationProperties,
extendedConsumerProperties.getExtension());
@@ -182,15 +181,6 @@ public class RocketMQMessageChannelBinder extends
};
}
- /**
- * generate anonymous group.
- * @param destination not null
- * @return anonymous group name.
- */
- private static String anonymousGroup(final String destination) {
- return RocketMQConst.DEFAULT_GROUP + "_" + destination;
- }
-
/**
* Binders can return an {@link ErrorMessageStrategy} for building error messages;
* binder implementations typically might add extra headers to the error message.
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
index df7374b06..cab038b16 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/autoconfigurate/ExtendedBindingHandlerMappingsProviderConfiguration.java
@@ -28,6 +28,7 @@ import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProv
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
@Configuration
public class ExtendedBindingHandlerMappingsProviderConfiguration {
@@ -62,4 +63,14 @@ public class ExtendedBindingHandlerMappingsProviderConfiguration {
return new RocketMQMessageConverter().getMessageConverter();
}
+ /**
+ * Register message converter to adapte Spring Cloud Stream.
+ * Refer to https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-user-defined-message-converters .
+ * @return message converter.
+ */
+ @Bean
+ public MessageConverter rocketMQCustomMessageConverter() {
+ return new RocketMQMessageConverter();
+ }
+
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
index f69290997..20133f571 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/convert/RocketMQMessageConverter.java
@@ -19,6 +19,8 @@ package com.alibaba.cloud.stream.binder.rocketmq.convert;
import java.util.ArrayList;
import java.util.List;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
@@ -31,7 +33,7 @@ import org.springframework.util.ClassUtils;
*
* @author zkzlx
*/
-public class RocketMQMessageConverter {
+public class RocketMQMessageConverter extends AbstractMessageConverter {
/**
* if you want to customize a bean, please use the BeanName.
@@ -87,4 +89,44 @@ public class RocketMQMessageConverter {
this.messageConverter = messageConverter;
}
+ /**
+ * support all classes.
+ * @param clazz classes.
+ * @return awayls true.
+ */
+ @Override
+ protected boolean supports(Class> clazz) {
+ return true;
+ }
+
+ /**
+ * Convert the message payload from serialized form to an Object by RocketMQMessageConverter.
+ * @param message the input message
+ * @param targetClass the target class for the conversion
+ * @param conversionHint an extra object passed to the {@link MessageConverter},
+ * e.g. the associated {@code MethodParameter} (may be {@code null}}
+ * @return the result of the conversion, or {@code null} if the converter cannot
+ * perform the conversion
+ * @since 4.2
+ */
+ @Override
+ protected Object convertFromInternal(Message> message, Class> targetClass, Object conversionHint) {
+ Object payload = null;
+ for (MessageConverter converter : getMessageConverter().getConverters()) {
+ try {
+ payload = converter.fromMessage(message, targetClass);
+ }
+ catch (Exception ignore) {
+ }
+ if (payload != null) {
+ return payload;
+ }
+ }
+ if (payload == null && logger.isDebugEnabled()) {
+ logger.debug("Can convert message " + message.toString());
+ }
+ return payload;
+ }
+
+
}
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java
index b4e5b1529..88a2b0fcd 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
@@ -95,15 +96,28 @@ public final class RocketMQConsumerFactory {
/**
* todo Compatible with versions less than 4.6 ?
+ * @param topic consumer topic.
* @param extendedConsumerProperties extendedConsumerProperties
* @return DefaultLitePullConsumer
*/
public static DefaultLitePullConsumer initPullConsumer(
+ String topic,
ExtendedConsumerProperties extendedConsumerProperties) {
RocketMQConsumerProperties consumerProperties = extendedConsumerProperties
.getExtension();
- Assert.notNull(consumerProperties.getGroup(),
- "Property 'group' is required - consumerGroup");
+ boolean anonymous = !StringUtils.hasLength(consumerProperties.getGroup());
+ /***
+ * When using DLQ, at least the group property must be provided for proper naming of the DLQ destination
+ * According to https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference
+ */
+ if (anonymous && NamespaceUtil.isDLQTopic(topic)) {
+ throw new RuntimeException(
+ "group must be configured for DLQ" + topic);
+ }
+ if (anonymous) {
+ consumerProperties.setGroup(RocketMQUtils.anonymousGroup(topic));
+ }
+
Assert.notNull(consumerProperties.getNameServer(),
"Property 'nameServer' is required");
AllocateMessageQueueStrategy allocateMessageQueueStrategy = RocketMQBeanContainerCache
diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
index c94a3ee87..286256260 100644
--- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
+++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java
@@ -87,7 +87,7 @@ public class RocketMQMessageSource extends AbstractMessageSource