rocketmq multi-broker offset error

pull/2571/head^2
sorie 3 years ago committed by sorie
parent 6e59ee2b99
commit 5dfbbd423e

@ -110,14 +110,15 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
this.running = true;
}
private MessageQueue acquireCurrentMessageQueue(String topic,int queueId, String brokerName) {
Collection<MessageQueue> messageQueueSet = messageQueuesForTopic.get(topic);
if(CollectionUtils.isEmpty(messageQueueSet)){
private MessageQueue acquireCurrentMessageQueue(String topic, int queueId,
String brokerName) {
Collection<MessageQueue> messageQueueSet = messageQueuesForTopic.get(topic);
if (CollectionUtils.isEmpty(messageQueueSet)) {
return null;
}
for (MessageQueue messageQueue : messageQueueSet) {
if (messageQueue.getQueueId() == queueId &&
ObjectUtils.nullSafeEquals(brokerName, messageQueue.getBrokerName())) {
if (messageQueue.getQueueId() == queueId && ObjectUtils
.nullSafeEquals(brokerName, messageQueue.getBrokerName())) {
return messageQueue;
}
}
@ -154,8 +155,8 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
if (null == messageExt) {
return null;
}
MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), messageExt.getQueueId(),
messageExt.getBrokerName());
MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(),
messageExt.getQueueId(), messageExt.getBrokerName());
if (messageQueue == null) {
throw new IllegalArgumentException(
"The message queue is not in assigned list");

@ -175,8 +175,11 @@ public final class RocketMQMessageConverterSupport {
.filter(entry -> !Objects.equals(entry.getKey(), Headers.FLAG))
.forEach(entry -> {
if (!MessageConst.STRING_HASH_SET.contains(entry.getKey())) {
rocketMsg.putUserProperty(entry.getKey(),
String.valueOf(entry.getValue()));
String val = String.valueOf(entry.getValue());
// Remove All blank header(rocketmq not support).
if (org.apache.commons.lang3.StringUtils.isNotBlank(val)) {
rocketMsg.putUserProperty(entry.getKey(), val);
}
}
});

Loading…
Cancel
Save