rocketmq multi-broker offset error

pull/2571/head
sorie 3 years ago
parent bb91f230bd
commit 62ff55cd16

@ -44,6 +44,7 @@ import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
/** /**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -109,13 +110,14 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
this.running = true; this.running = true;
} }
private MessageQueue acquireCurrentMessageQueue(String topic, int queueId) { private MessageQueue acquireCurrentMessageQueue(String topic,int queueId, String brokerName) {
Collection<MessageQueue> messageQueueSet = messageQueuesForTopic.get(topic); Collection<MessageQueue> messageQueueSet = messageQueuesForTopic.get(topic);
if(CollectionUtils.isEmpty(messageQueueSet)){ if(CollectionUtils.isEmpty(messageQueueSet)){
return null; return null;
} }
for (MessageQueue messageQueue : messageQueueSet) { for (MessageQueue messageQueue : messageQueueSet) {
if (messageQueue.getQueueId() == queueId) { if (messageQueue.getQueueId() == queueId &&
ObjectUtils.nullSafeEquals(brokerName, messageQueue.getBrokerName())) {
return messageQueue; return messageQueue;
} }
} }
@ -152,8 +154,8 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>
if (null == messageExt) { if (null == messageExt) {
return null; return null;
} }
MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), messageExt.getQueueId(),
messageExt.getQueueId()); messageExt.getBrokerName());
if (messageQueue == null) { if (messageQueue == null) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"The message queue is not in assigned list"); "The message queue is not in assigned list");

Loading…
Cancel
Save