From 6e59ee2b99fbe81e3052e24639d26e41b96dcd4a Mon Sep 17 00:00:00 2001 From: sorie Date: Sun, 22 May 2022 11:20:07 +0800 Subject: [PATCH 1/6] rocketmq multi-broker offset error --- .../inbound/pull/RocketMQMessageSource.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java index d0f6c6234..bac250c41 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java @@ -44,6 +44,7 @@ import org.springframework.integration.endpoint.AbstractMessageSource; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; /** * @author Jim @@ -109,13 +110,14 @@ public class RocketMQMessageSource extends AbstractMessageSource this.running = true; } - private MessageQueue acquireCurrentMessageQueue(String topic, int queueId) { - Collection messageQueueSet = messageQueuesForTopic.get(topic); - if (CollectionUtils.isEmpty(messageQueueSet)) { + private MessageQueue acquireCurrentMessageQueue(String topic,int queueId, String brokerName) { + Collection messageQueueSet = messageQueuesForTopic.get(topic); + if(CollectionUtils.isEmpty(messageQueueSet)){ return null; } for (MessageQueue messageQueue : messageQueueSet) { - if (messageQueue.getQueueId() == queueId) { + if (messageQueue.getQueueId() == queueId && + ObjectUtils.nullSafeEquals(brokerName, messageQueue.getBrokerName())) { return messageQueue; } } @@ -152,8 +154,8 @@ public class RocketMQMessageSource extends AbstractMessageSource if (null == messageExt) { return null; } - MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), - messageExt.getQueueId()); + MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), messageExt.getQueueId(), + messageExt.getBrokerName()); if (messageQueue == null) { throw new IllegalArgumentException( "The message queue is not in assigned list"); From 5dfbbd423ea36997fb443aeb857e9957d1431591 Mon Sep 17 00:00:00 2001 From: sorie <819294006@qq.com> Date: Sun, 22 May 2022 11:54:11 +0800 Subject: [PATCH 2/6] rocketmq multi-broker offset error --- .../inbound/pull/RocketMQMessageSource.java | 15 ++++++++------- .../support/RocketMQMessageConverterSupport.java | 7 +++++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java index bac250c41..c94a3ee87 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java @@ -110,14 +110,15 @@ public class RocketMQMessageSource extends AbstractMessageSource this.running = true; } - private MessageQueue acquireCurrentMessageQueue(String topic,int queueId, String brokerName) { - Collection messageQueueSet = messageQueuesForTopic.get(topic); - if(CollectionUtils.isEmpty(messageQueueSet)){ + private MessageQueue acquireCurrentMessageQueue(String topic, int queueId, + String brokerName) { + Collection messageQueueSet = messageQueuesForTopic.get(topic); + if (CollectionUtils.isEmpty(messageQueueSet)) { return null; } for (MessageQueue messageQueue : messageQueueSet) { - if (messageQueue.getQueueId() == queueId && - ObjectUtils.nullSafeEquals(brokerName, messageQueue.getBrokerName())) { + if (messageQueue.getQueueId() == queueId && ObjectUtils + .nullSafeEquals(brokerName, messageQueue.getBrokerName())) { return messageQueue; } } @@ -154,8 +155,8 @@ public class RocketMQMessageSource extends AbstractMessageSource if (null == messageExt) { return null; } - MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), messageExt.getQueueId(), - messageExt.getBrokerName()); + MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), + messageExt.getQueueId(), messageExt.getBrokerName()); if (messageQueue == null) { throw new IllegalArgumentException( "The message queue is not in assigned list"); diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java index ace7a901d..7fde92078 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java @@ -175,8 +175,11 @@ public final class RocketMQMessageConverterSupport { .filter(entry -> !Objects.equals(entry.getKey(), Headers.FLAG)) .forEach(entry -> { if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) { - rocketMsg.putUserProperty(entry.getKey(), - String.valueOf(entry.getValue())); + String val = String.valueOf(entry.getValue()); + // Remove All blank header(rocketmq not support). + if (org.apache.commons.lang3.StringUtils.isNotBlank(val)) { + rocketMsg.putUserProperty(entry.getKey(), val); + } } }); From 5b23dc2e96651e8077888b8d218ffe543b6d1262 Mon Sep 17 00:00:00 2001 From: sorie Date: Wed, 25 May 2022 16:46:25 +0800 Subject: [PATCH 3/6] rocketmq blank header support test --- .../RocketMQMessageConverterSupportTest.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java new file mode 100644 index 000000000..afedda5a3 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java @@ -0,0 +1,30 @@ +package com.alibaba.cloud.stream.binder.rocketmq; + +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport; +import org.apache.rocketmq.common.message.MessageConst; +import org.junit.jupiter.api.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Sorie + */ +public class RocketMQMessageConverterSupportTest { + + @Test + public void convertMessage2MQBlankHeaderTest() { + String destination = "test"; + Message message = MessageBuilder.withPayload("msg") + .setHeader(MessageConst.PROPERTY_TAGS, "a") + .setHeader("test", "") + .build(); + org.apache.rocketmq.common.message.Message rkmqMsg = + RocketMQMessageConverterSupport.convertMessage2MQ(destination, message); + String testProp = rkmqMsg.getProperty("test"); + String tagProp = rkmqMsg.getProperty(MessageConst.PROPERTY_TAGS); + assertThat(testProp).isNull(); + assertThat(tagProp).isEqualTo("a"); + } +} From 5ea6eb74e69131ae5c01baf33192431ec5e6403c Mon Sep 17 00:00:00 2001 From: sorie Date: Wed, 25 May 2022 19:45:25 +0800 Subject: [PATCH 4/6] rocketmq multi-broker offset error code style --- .../RocketMQMessageConverterSupportTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java index afedda5a3..7b829c71f 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java @@ -1,8 +1,25 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.alibaba.cloud.stream.binder.rocketmq; import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport; import org.apache.rocketmq.common.message.MessageConst; import org.junit.jupiter.api.Test; + import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; From 455981547a8e3fd2beaf1a75edb61a97762d37cd Mon Sep 17 00:00:00 2001 From: sorie Date: Wed, 25 May 2022 19:52:20 +0800 Subject: [PATCH 5/6] fix mvn install problem. --- spring-cloud-alibaba-examples/pom.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index f7b2cd3e8..82c346d77 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -32,8 +32,6 @@ seata-example/order-service seata-example/storage-service seata-example/account-service - rocketmq-example/rocketmq-consume-example - rocketmq-example/rocketmq-produce-example rocketmq-example/rocketmq-comprehensive-example rocketmq-example/rocketmq-orderly-consume-example rocketmq-example/rocketmq-broadcast-example/rocketmq-broadcast-producer-example From 7fe86a9f09584b421b87b60f041d1b0b196dc172 Mon Sep 17 00:00:00 2001 From: sorie Date: Wed, 25 May 2022 19:53:54 +0800 Subject: [PATCH 6/6] fix mvn install problem. --- spring-cloud-alibaba-examples/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/spring-cloud-alibaba-examples/pom.xml b/spring-cloud-alibaba-examples/pom.xml index 82c346d77..35df4923d 100644 --- a/spring-cloud-alibaba-examples/pom.xml +++ b/spring-cloud-alibaba-examples/pom.xml @@ -26,7 +26,6 @@ nacos-example/nacos-discovery-example nacos-example/nacos-config-example nacos-example/nacos-config-2.4.x-example - nacos-example/nacos-config-preference-example nacos-example/nacos-gateway-example seata-example/business-service seata-example/order-service