Merge pull request #2571 from Sorieee/2021.x.rocketmq_blank_header_support

2021.x. rocketmq multi-broker offset error and blank header support
pull/2596/head
Steve Rao 3 years ago committed by GitHub
commit 904cf79380
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -44,6 +44,7 @@ import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
/**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -109,13 +110,15 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
this.running = true;
}
private MessageQueue acquireCurrentMessageQueue(String topic, int queueId) {
private MessageQueue acquireCurrentMessageQueue(String topic, int queueId,
String brokerName) {
Collection<MessageQueue> messageQueueSet = messageQueuesForTopic.get(topic);
if (CollectionUtils.isEmpty(messageQueueSet)) {
return null;
}
for (MessageQueue messageQueue : messageQueueSet) {
if (messageQueue.getQueueId() == queueId) {
if (messageQueue.getQueueId() == queueId && ObjectUtils
.nullSafeEquals(brokerName, messageQueue.getBrokerName())) {
return messageQueue;
}
}
@ -153,7 +156,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
return null;
}
MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(),
messageExt.getQueueId());
messageExt.getQueueId(), messageExt.getBrokerName());
if (messageQueue == null) {
throw new IllegalArgumentException(
"The message queue is not in assigned list");

@ -175,8 +175,11 @@ public final class RocketMQMessageConverterSupport {
.filter(entry -> !Objects.equals(entry.getKey(), Headers.FLAG))
.forEach(entry -> {
if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
rocketMsg.putUserProperty(entry.getKey(),
String.valueOf(entry.getValue()));
String val = String.valueOf(entry.getValue());
// Remove All blank header(rocketmq not support).
if (org.apache.commons.lang3.StringUtils.isNotBlank(val)) {
rocketMsg.putUserProperty(entry.getKey(), val);
}
}
});

@ -0,0 +1,47 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.cloud.stream.binder.rocketmq;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import org.apache.rocketmq.common.message.MessageConst;
import org.junit.jupiter.api.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Sorie
*/
public class RocketMQMessageConverterSupportTest {
@Test
public void convertMessage2MQBlankHeaderTest() {
String destination = "test";
Message message = MessageBuilder.withPayload("msg")
.setHeader(MessageConst.PROPERTY_TAGS, "a")
.setHeader("test", "")
.build();
org.apache.rocketmq.common.message.Message rkmqMsg =
RocketMQMessageConverterSupport.convertMessage2MQ(destination, message);
String testProp = rkmqMsg.getProperty("test");
String tagProp = rkmqMsg.getProperty(MessageConst.PROPERTY_TAGS);
assertThat(testProp).isNull();
assertThat(tagProp).isEqualTo("a");
}
}
Loading…
Cancel
Save