add register channel

pull/2750/head
windwheel 3 years ago committed by Steve Rao
parent 68bb568ee4
commit 10deb9cbcd

@ -20,6 +20,8 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
@ -57,6 +59,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.converter.DefaultContentTypeResolver;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.ChannelInterceptor;
@ -82,6 +85,9 @@ public class RocketMQMessageChannelBinder extends
private final RocketMQBinderConfigurationProperties binderConfigurationProperties;
private final MessageCollectorImpl messageCollector = new MessageCollectorImpl();
private final ConcurrentMap<String, MessageChannel> messageChannels = new ConcurrentHashMap();
public RocketMQMessageChannelBinder(
RocketMQBinderConfigurationProperties binderConfigurationProperties,
RocketMQExtendedBindingProperties extendedBindingProperties,
@ -116,9 +122,15 @@ public class RocketMQMessageChannelBinder extends
messageHandler.setPartitioningInterceptor(partitioningInterceptor);
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
messageHandler.setErrorMessageStrategy(this.getErrorMessageStrategy());
final BlockingQueue<Message<?>> queue = this.messageCollector.register(channel, extendedProducerProperties.isUseNativeEncoding());
((SubscribableChannel)channel).subscribe(queue::add);
this.messageChannels.put(destination.getName(), channel);
return messageHandler;
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,
@ -285,7 +297,6 @@ public class RocketMQMessageChannelBinder extends
}
}
private static class MessageCollectorImpl implements MessageCollector {
private final Map<MessageChannel, BlockingQueue<Message<?>>> results;
@ -304,12 +315,8 @@ public class RocketMQMessageChannelBinder extends
return result;
}
private void unregister(MessageChannel channel) {
Assert.notNull(this.results.remove(channel), "Trying to unregister a mapping for an unknown channel [" + channel + "]");
}
public BlockingQueue<Message<?>> forChannel(MessageChannel channel) {
BlockingQueue<Message<?>> queue = (BlockingQueue)this.results.get(channel);
BlockingQueue<Message<?>> queue = this.results.get(channel);
Assert.notNull(queue, "Channel [" + channel + "] was not bound by " + RocketMQMessageChannelBinder.class);
return queue;
}

@ -6,7 +6,6 @@ import java.util.concurrent.BlockingQueue;
import com.alibaba.cloud.rocketmq.SimpleMsg;
import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.MessageCollector;
import com.alibaba.cloud.testsupport.SpringCloudAlibaba;
import com.alibaba.cloud.testsupport.TestExtend;
@ -14,7 +13,6 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
@ -25,18 +23,13 @@ import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerA
import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import static com.alibaba.cloud.testsupport.Constant.TIME_OUT;
import static org.hamcrest.CoreMatchers.is;

Loading…
Cancel
Save