Merge pull request #1 from alibaba/master

update
pull/1167/head
yuhuangbin 5 years ago committed by GitHub
commit c7f1d02e21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -16,9 +16,13 @@
package com.alibaba.cloud.stream.binder.rocketmq; package com.alibaba.cloud.stream.binder.rocketmq;
import java.util.Arrays;
import java.util.List;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
/** /**
@ -38,7 +42,7 @@ public final class RocketMQBinderUtils {
result.setNameServer(rocketBinderConfigurationProperties.getNameServer()); result.setNameServer(rocketBinderConfigurationProperties.getNameServer());
} }
else { else {
result.setNameServer(rocketMQProperties.getNameServer()); result.setNameServer(Arrays.asList(rocketMQProperties.getNameServer().split(";")));
} }
if (rocketMQProperties.getProducer() == null if (rocketMQProperties.getProducer() == null
|| StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) { || StringUtils.isEmpty(rocketMQProperties.getProducer().getAccessKey())) {
@ -74,4 +78,11 @@ public final class RocketMQBinderUtils {
return result; return result;
} }
public static String getNameServerStr(List<String> nameServerList) {
if (CollectionUtils.isEmpty(nameServerList)) {
return null;
}
return String.join(";", nameServerList);
}
} }

@ -144,7 +144,8 @@ public class RocketMQMessageChannelBinder extends
producer.setVipChannelEnabled( producer.setVipChannelEnabled(
producerProperties.getExtension().getVipChannelEnabled()); producerProperties.getExtension().getVipChannelEnabled());
} }
producer.setNamesrvAddr(mergedProperties.getNameServer()); producer.setNamesrvAddr(RocketMQBinderUtils
.getNameServerStr(mergedProperties.getNameServer()));
producer.setSendMsgTimeout( producer.setSendMsgTimeout(
producerProperties.getExtension().getSendMessageTimeout()); producerProperties.getExtension().getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed( producer.setRetryTimesWhenSendFailed(

@ -19,6 +19,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.consuming;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder; import com.alibaba.cloud.stream.binder.rocketmq.RocketMQMessageChannelBinder;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
@ -83,7 +84,7 @@ public class RocketMQListenerBindingContainer
*/ */
private int delayLevelWhenNextConsume = 0; private int delayLevelWhenNextConsume = 0;
private String nameServer; private List<String> nameServer;
private String consumerGroup; private String consumerGroup;
@ -233,7 +234,7 @@ public class RocketMQListenerBindingContainer
rocketBinderConfigurationProperties.getCustomizedTraceTopic()); rocketBinderConfigurationProperties.getCustomizedTraceTopic());
} }
consumer.setNamesrvAddr(nameServer); consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(nameServer));
consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMax(rocketMQConsumerProperties.getConcurrency());
consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency()); consumer.setConsumeThreadMin(rocketMQConsumerProperties.getConcurrency());
@ -304,11 +305,11 @@ public class RocketMQListenerBindingContainer
this.delayLevelWhenNextConsume = delayLevelWhenNextConsume; this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
} }
public String getNameServer() { public List<String> getNameServer() {
return nameServer; return nameServer;
} }
public void setNameServer(String nameServer) { public void setNameServer(List<String> nameServer) {
this.nameServer = nameServer; this.nameServer = nameServer;
} }

@ -19,6 +19,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderUtils;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser; import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQMessageQueueChooser;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
@ -103,8 +104,8 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
} }
try { try {
consumer = new DefaultMQPullConsumer(group); consumer = new DefaultMQPullConsumer(group);
consumer.setNamesrvAddr( consumer.setNamesrvAddr(RocketMQBinderUtils.getNameServerStr(
rocketMQBinderConfigurationProperties.getNameServer()); rocketMQBinderConfigurationProperties.getNameServer()));
consumer.setConsumerPullTimeoutMillis( consumer.setConsumerPullTimeoutMillis(
rocketMQConsumerProperties.getExtension().getPullTimeout()); rocketMQConsumerProperties.getExtension().getPullTimeout());
consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setMessageModel(MessageModel.CLUSTERING);

@ -16,28 +16,26 @@
package com.alibaba.cloud.stream.binder.rocketmq.properties; package com.alibaba.cloud.stream.binder.rocketmq.properties;
import javax.validation.constraints.Pattern; import java.util.Arrays;
import java.util.List;
import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants; import com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/** /**
* @author Timur Valiev * @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
*/ */
@ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder") @ConfigurationProperties(prefix = "spring.cloud.stream.rocketmq.binder")
@Validated
public class RocketMQBinderConfigurationProperties { public class RocketMQBinderConfigurationProperties {
/** /**
* The name server for rocketMQ, formats: `host:port;host:port`. * The name server list for rocketMQ.
*/ */
@Pattern(regexp = "^[\\d.:;]+$", private List<String> nameServer = Arrays
message = "nameServer needs to match expression \"host:port;host:port\"") .asList(RocketMQBinderConstants.DEFAULT_NAME_SERVER);
private String nameServer = RocketMQBinderConstants.DEFAULT_NAME_SERVER;
/** /**
* The property of "access-key". * The property of "access-key".
@ -60,11 +58,11 @@ public class RocketMQBinderConfigurationProperties {
*/ */
private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC; private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
public String getNameServer() { public List<String> getNameServer() {
return nameServer; return nameServer;
} }
public void setNameServer(String nameServer) { public void setNameServer(List<String> nameServer) {
this.nameServer = nameServer; this.nameServer = nameServer;
} }

@ -16,6 +16,8 @@
package com.alibaba.cloud.stream.binder.rocketmq; package com.alibaba.cloud.stream.binder.rocketmq;
import java.util.Arrays;
import com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration; import com.alibaba.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
@ -35,7 +37,8 @@ public class RocketMQAutoConfigurationTests {
.withConfiguration( .withConfiguration(
AutoConfigurations.of(RocketMQBinderAutoConfiguration.class)) AutoConfigurations.of(RocketMQBinderAutoConfiguration.class))
.withPropertyValues( .withPropertyValues(
"spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876", "spring.cloud.stream.rocketmq.binder.name-server[0]=127.0.0.1:9876",
"spring.cloud.stream.rocketmq.binder.name-server[1]=127.0.0.1:9877",
"spring.cloud.stream.bindings.output.destination=TopicOrderTest", "spring.cloud.stream.bindings.output.destination=TopicOrderTest",
"spring.cloud.stream.bindings.output.content-type=application/json", "spring.cloud.stream.bindings.output.content-type=application/json",
"spring.cloud.stream.bindings.input1.destination=TopicOrderTest", "spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
@ -55,7 +58,7 @@ public class RocketMQAutoConfigurationTests {
RocketMQBinderConfigurationProperties binderConfigurationProperties = context RocketMQBinderConfigurationProperties binderConfigurationProperties = context
.getBean(RocketMQBinderConfigurationProperties.class); .getBean(RocketMQBinderConfigurationProperties.class);
assertThat(binderConfigurationProperties.getNameServer()) assertThat(binderConfigurationProperties.getNameServer())
.isEqualTo("127.0.0.1:9876"); .isEqualTo(Arrays.asList("127.0.0.1:9876", "127.0.0.1:9877"));
RocketMQExtendedBindingProperties bindingProperties = context RocketMQExtendedBindingProperties bindingProperties = context
.getBean(RocketMQExtendedBindingProperties.class); .getBean(RocketMQExtendedBindingProperties.class);
assertThat( assertThat(

Loading…
Cancel
Save