diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 781a1896e..d12a3b3d3 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -20,6 +20,8 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; @@ -57,6 +59,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; +import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.converter.DefaultContentTypeResolver; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.support.ChannelInterceptor; @@ -82,6 +85,9 @@ public class RocketMQMessageChannelBinder extends private final RocketMQBinderConfigurationProperties binderConfigurationProperties; + private final MessageCollectorImpl messageCollector = new MessageCollectorImpl(); + private final ConcurrentMap messageChannels = new ConcurrentHashMap(); + public RocketMQMessageChannelBinder( RocketMQBinderConfigurationProperties binderConfigurationProperties, RocketMQExtendedBindingProperties extendedBindingProperties, @@ -116,9 +122,15 @@ public class RocketMQMessageChannelBinder extends messageHandler.setPartitioningInterceptor(partitioningInterceptor); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setErrorMessageStrategy(this.getErrorMessageStrategy()); + + final BlockingQueue> queue = this.messageCollector.register(channel, extendedProducerProperties.isUseNativeEncoding()); + ((SubscribableChannel)channel).subscribe(queue::add); + this.messageChannels.put(destination.getName(), channel); + return messageHandler; } + @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, @@ -285,7 +297,6 @@ public class RocketMQMessageChannelBinder extends } } - private static class MessageCollectorImpl implements MessageCollector { private final Map>> results; @@ -304,12 +315,8 @@ public class RocketMQMessageChannelBinder extends return result; } - private void unregister(MessageChannel channel) { - Assert.notNull(this.results.remove(channel), "Trying to unregister a mapping for an unknown channel [" + channel + "]"); - } - public BlockingQueue> forChannel(MessageChannel channel) { - BlockingQueue> queue = (BlockingQueue)this.results.get(channel); + BlockingQueue> queue = this.results.get(channel); Assert.notNull(queue, "Channel [" + channel + "] was not bound by " + RocketMQMessageChannelBinder.class); return queue; } diff --git a/spring-cloud-alibaba-tests/rocketmq-tests/rocketmq-stream-test/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketmqProduceAndConsumerTests.java b/spring-cloud-alibaba-tests/rocketmq-tests/rocketmq-stream-test/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketmqProduceAndConsumerTests.java index 421eb4d76..374e04ac1 100644 --- a/spring-cloud-alibaba-tests/rocketmq-tests/rocketmq-stream-test/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketmqProduceAndConsumerTests.java +++ b/spring-cloud-alibaba-tests/rocketmq-tests/rocketmq-stream-test/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketmqProduceAndConsumerTests.java @@ -6,7 +6,6 @@ import java.util.concurrent.BlockingQueue; import com.alibaba.cloud.rocketmq.SimpleMsg; import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; import com.alibaba.cloud.stream.binder.rocketmq.support.MessageCollector; import com.alibaba.cloud.testsupport.SpringCloudAlibaba; import com.alibaba.cloud.testsupport.TestExtend; @@ -14,7 +13,6 @@ import org.apache.rocketmq.common.message.MessageConst; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -25,18 +23,13 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerA import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration; -import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.messaging.Processor; -import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.GenericMessage; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit4.SpringRunner; import static com.alibaba.cloud.testsupport.Constant.TIME_OUT; import static org.hamcrest.CoreMatchers.is;