diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java index d0f6c6234..bac250c41 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/pull/RocketMQMessageSource.java @@ -44,6 +44,7 @@ import org.springframework.integration.endpoint.AbstractMessageSource; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; /** * @author Jim @@ -109,13 +110,14 @@ public class RocketMQMessageSource extends AbstractMessageSource this.running = true; } - private MessageQueue acquireCurrentMessageQueue(String topic, int queueId) { - Collection messageQueueSet = messageQueuesForTopic.get(topic); - if (CollectionUtils.isEmpty(messageQueueSet)) { + private MessageQueue acquireCurrentMessageQueue(String topic,int queueId, String brokerName) { + Collection messageQueueSet = messageQueuesForTopic.get(topic); + if(CollectionUtils.isEmpty(messageQueueSet)){ return null; } for (MessageQueue messageQueue : messageQueueSet) { - if (messageQueue.getQueueId() == queueId) { + if (messageQueue.getQueueId() == queueId && + ObjectUtils.nullSafeEquals(brokerName, messageQueue.getBrokerName())) { return messageQueue; } } @@ -152,8 +154,8 @@ public class RocketMQMessageSource extends AbstractMessageSource if (null == messageExt) { return null; } - MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), - messageExt.getQueueId()); + MessageQueue messageQueue = this.acquireCurrentMessageQueue(messageExt.getTopic(), messageExt.getQueueId(), + messageExt.getBrokerName()); if (messageQueue == null) { throw new IllegalArgumentException( "The message queue is not in assigned list");