diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java index 3cd7c85bd..6665ea7bf 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQBinderUtils.java @@ -16,9 +16,13 @@ package com.alibaba.cloud.stream.binder.rocketmq; +import java.util.Arrays; +import java.util.List; + import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; /** @@ -38,7 +42,7 @@ public final class RocketMQBinderUtils { result.setNameServer(rocketBinderConfigurationProperties.getNameServer()); } else { - result.setNameServer(rocketMQProperties.getNameServer()); + result.setNameServer(Arrays.asList(rocketMQProperties.getNameServer().split(";"))); } if (rocketMQProperties.getProducer() == null || StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) { @@ -74,4 +78,11 @@ public final class RocketMQBinderUtils { return result; } + public static String getNameServerStr(List nameServerList) { + if (CollectionUtils.isEmpty(nameServerList)) { + return null; + } + return String.join(";", nameServerList); + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 73629f19d..b46acc9c8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -144,7 +144,8 @@ public class RocketMQMessageChannelBinder extends producer.setVipChannelEnabled( producerProperties.getExtension().getVipChannelEnabled()); } - producer.setNamesrvAddr(mergedProperties.getNameServer()); + producer.setNamesrvAddr(RocketMQBinderUtils + .getNameServerStr(mergedProperties.getNameServer())); producer.setSendMsgTimeout( producerProperties.getExtension().getSendMessageTimeout()); producer.setRetryTimesWhenSendFailed( diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 0e384b3e0..fb167c697 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -19,6 +19,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.consuming; import java.util.List; import java.util.Objects; +import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils; import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; @@ -83,7 +84,7 @@ public class RocketMQListenerBindingContainer */ private int delayLevelWhenNextConsume = 0; - private String nameServer; + private List nameServer; private String consumerGroup; @@ -233,7 +234,7 @@ public class RocketMQListenerBindingContainer rocketBinderConfigurationProperties.getCustomizedTraceTopic()); } - consumer.setNamesrvAddr(nameServer); + consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer)); consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); @@ -304,11 +305,11 @@ public class RocketMQListenerBindingContainer this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; } - public String getNameServer() { + public List getNameServer() { return nameServer; } - public void setNameServer(String nameServer) { + public void setNameServer(List nameServer) { this.nameServer = nameServer; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java index 7fe459a70..dd95864e0 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageSource.java @@ -19,6 +19,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration; import java.util.List; import java.util.Set; +import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils; import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; @@ -103,8 +104,8 @@ public class RocketMQMessageSource extends AbstractMessageSource } try { consumer = new DefaultMQPullConsumer(group); - consumer.setNamesrvAddr( - rocketMQBinderConfigurationProperties.getNameServer()); + consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr( + rocketMQBinderConfigurationProperties.getNameServer())); consumer.setConsumerPullTimeoutMillis( rocketMQConsumerProperties.getExtension().getPullTimeout()); consumer.setMessageModel(MessageModel.CLUSTERING); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java index 6bfc63652..3a9dcb403 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQBinderConfigurationProperties.java @@ -16,28 +16,26 @@ package com.alibaba.cloud.stream.binder.rocketmq.properties; -import javax.validation.constraints.Pattern; +import java.util.Arrays; +import java.util.List; import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import org.apache.rocketmq.common.MixAll; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; /** * @author Timur Valiev * @author Jim */ @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") -@Validated public class RocketMQBinderConfigurationProperties { /** - * The name server for rocketMQ, formats: `host:port;host:port`. + * The name server list for rocketMQ. */ - @Pattern(regexp = "^[\\d.:;]+$", - message = "nameServer needs to match expression \"host:port;host:port\"") - private String nameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER; + private List nameServer = Arrays + .asList(RocketMQBinderConstants.DEFAULT_NAME_SERVER); /** * The property of "access-key". @@ -60,11 +58,11 @@ public class RocketMQBinderConfigurationProperties { */ private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC; - public String getNameServer() { + public List getNameServer() { return nameServer; } - public void setNameServer(String nameServer) { + public void setNameServer(List nameServer) { this.nameServer = nameServer; } diff --git a/spring-cloud-stream-binder-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java b/spring-cloud-stream-binder-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java index d4f7defa7..8207f9893 100644 --- a/spring-cloud-stream-binder-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java +++ b/spring-cloud-stream-binder-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQAutoConfigurationTests.java @@ -16,6 +16,8 @@ package com.alibaba.cloud.stream.binder.rocketmq; +import java.util.Arrays; + import com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; @@ -35,7 +37,8 @@ public class RocketMQAutoConfigurationTests { .withConfiguration( AutoConfigurations.of(RocketMQBinderAutoConfiguration.class)) .withPropertyValues( - "spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876", + "spring.cloud.stream.rocketmq.binder.name-server[0]=127.0.0.1:9876", + "spring.cloud.stream.rocketmq.binder.name-server[1]=127.0.0.1:9877", "spring.cloud.stream.bindings.output.destination=TopicOrderTest", "spring.cloud.stream.bindings.output.content-type=application/json", "spring.cloud.stream.bindings.input1.destination=TopicOrderTest", @@ -55,7 +58,7 @@ public class RocketMQAutoConfigurationTests { RocketMQBinderConfigurationProperties binderConfigurationProperties = context .getBean(RocketMQBinderConfigurationProperties.class); assertThat(binderConfigurationProperties.getNameServer()) - .isEqualTo("127.0.0.1:9876"); + .isEqualTo(Arrays.asList("127.0.0.1:9876", "127.0.0.1:9877")); RocketMQExtendedBindingProperties bindingProperties = context .getBean(RocketMQExtendedBindingProperties.class); assertThat(