add channel outout collector

pull/2750/head
windwheel 2 years ago committed by Steve Rao
parent 483c729913
commit 68bb568ee4

@ -16,6 +16,12 @@
package com.alibaba.cloud.stream.binder.rocketmq;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache;
import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter;
@ -27,6 +33,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.support.MessageCollector;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
@ -36,6 +43,8 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.converter.MessageConverterUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.integration.StaticMessageHeaderAccessor;
@ -44,9 +53,17 @@ import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.support.DefaultErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.DefaultContentTypeResolver;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StringUtils;
@ -211,4 +228,90 @@ public class RocketMQMessageChannelBinder extends
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}
private static final class InboundMessageConvertingInterceptor implements ChannelInterceptor {
private final DefaultContentTypeResolver contentTypeResolver;
private final CompositeMessageConverterFactory converterFactory;
private InboundMessageConvertingInterceptor() {
this.contentTypeResolver = new DefaultContentTypeResolver();
this.converterFactory = new CompositeMessageConverterFactory();
}
private static boolean equalTypeAndSubType(MimeType m1, MimeType m2) {
return m1 != null && m2 != null && m1.getType().equalsIgnoreCase(m2.getType()) && m1.getSubtype().equalsIgnoreCase(m2.getSubtype());
}
public Message<?> preSend(Message<?> message, MessageChannel channel) {
Class<?> targetClass = null;
MessageConverter converter = null;
MimeType contentType = MimeType.valueOf(this.contentTypeResolver.resolve(message.getHeaders()).toString());
if (contentType != null && (equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentType) || equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentType))) {
message = MessageBuilder.fromMessage(message).setHeader("contentType", contentType).build();
converter = equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentType) ? this.converterFactory.getMessageConverterForType(contentType) : this.converterFactory.getMessageConverterForAllRegistered();
String targetClassName = contentType.getParameter("type");
if (StringUtils.hasText(targetClassName)) {
try {
targetClass = Class.forName(targetClassName, false, Thread.currentThread().getContextClassLoader());
} catch (Exception var8) {
throw new IllegalStateException("Failed to determine class name for contentType: " + message.getHeaders(), var8);
}
}
}
Object payload;
if (converter != null) {
Assert.isTrue(!equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentType) || targetClass != null, "Cannot deserialize into message since 'contentType` is not encoded with the actual target type.Consider 'application/x-java-object; type=foo.bar.MyClass'");
payload = converter.fromMessage(message, targetClass);
} else {
MimeType deserializeContentType = this.contentTypeResolver.resolve(message.getHeaders());
if (deserializeContentType == null) {
deserializeContentType = contentType;
}
payload = deserializeContentType == null ? message.getPayload() : this.deserializePayload(message.getPayload(), deserializeContentType);
}
message = MessageBuilder.withPayload(payload).copyHeaders(message.getHeaders()).setHeader("contentType", contentType).build();
return message;
}
private Object deserializePayload(Object payload, MimeType contentType) {
if (payload instanceof byte[] && ("text".equalsIgnoreCase(contentType.getType()) || equalTypeAndSubType(MimeTypeUtils.APPLICATION_JSON, contentType))) {
payload = new String((byte[])((byte[])payload), StandardCharsets.UTF_8);
}
return payload;
}
}
private static class MessageCollectorImpl implements MessageCollector {
private final Map<MessageChannel, BlockingQueue<Message<?>>> results;
private MessageCollectorImpl() {
this.results = new HashMap();
}
private BlockingQueue<Message<?>> register(MessageChannel channel, boolean useNativeEncoding) {
if (!useNativeEncoding) {
((AbstractMessageChannel)channel).addInterceptor(new InboundMessageConvertingInterceptor());
}
LinkedBlockingDeque<Message<?>> result = new LinkedBlockingDeque();
Assert.isTrue(!this.results.containsKey(channel), "Channel [" + channel + "] was already bound");
this.results.put(channel, result);
return result;
}
private void unregister(MessageChannel channel) {
Assert.notNull(this.results.remove(channel), "Trying to unregister a mapping for an unknown channel [" + channel + "]");
}
public BlockingQueue<Message<?>> forChannel(MessageChannel channel) {
BlockingQueue<Message<?>> queue = (BlockingQueue)this.results.get(channel);
Assert.notNull(queue, "Channel [" + channel + "] was not bound by " + RocketMQMessageChannelBinder.class);
return queue;
}
}
}

@ -0,0 +1,10 @@
package com.alibaba.cloud.stream.binder.rocketmq.support;
import java.util.concurrent.BlockingQueue;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
public interface MessageCollector {
BlockingQueue<Message<?>> forChannel(MessageChannel channel);
}

@ -2,57 +2,50 @@ package com.alibaba.cloud.stream.binder.rocketmq;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import com.alibaba.cloud.rocketmq.SimpleMsg;
import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.MessageCollector;
import com.alibaba.cloud.testsupport.SpringCloudAlibaba;
import com.alibaba.cloud.testsupport.TestExtend;
import org.apache.rocketmq.common.message.MessageConst;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.StringUtils;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Profile;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;
import static com.alibaba.cloud.testsupport.Constant.TIME_OUT;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE;
@SpringCloudAlibaba(composeFiles = "docker/rocket-compose-test.yml", serviceName = "rocketmq-standalone")
@TestExtend(time = 10 * TIME_OUT)
@RunWith(SpringRunner.class)
@EnableBinding({Processor.class, RocketmqProduceAndConsumerTests.PolledProcessor.class})
@DirtiesContext
@TestExtend(time = 6 * TIME_OUT)
//@EnableBinding({Processor.class, RocketmqProduceAndConsumerTests.PolledProcessor.class})
@SpringBootTest(classes = RocketmqProduceAndConsumerTests.TestConfig.class, webEnvironment = NONE, properties = {
"spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876,127.0.0.1:9877",
"spring.cloud.stream.rocketmq.binder.group=flaky-group",
@ -64,15 +57,24 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironmen
"spring.cloud.stream.bindings.input1.destination=TopicOrderTest",
"spring.cloud.stream.bindings.input1.content-type=application/json",
"spring.cloud.stream.bindings.input1.group=test-group1",
"spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true",
"spring.cloud.stream.bindings.input1.consumer.push.orderly=true",
"spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",})
public class RocketmqProduceAndConsumerTests {
@Autowired
private MessageCollector collector;
@Autowired
@Qualifier("input1")
private MessageChannel input1;
@Autowired
private PolledProcessor processor;
@Qualifier("output")
private MessageChannel output;
@BeforeAll
public static void prepare(){
}
@BeforeEach
@ -84,20 +86,23 @@ public class RocketmqProduceAndConsumerTests {
headers.put(MessageConst.PROPERTY_TAGS, "TagA");
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, messageId);
Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ"), headers);
processor.output().send(msg);
input1.send(msg);
}
@Test
@StreamListener(PolledProcessor.CUSTOMIZE_OUTPUT)
public void testConsumeAndProduce() throws InterruptedException {
public void testConsumeAndProduce() throws Exception {
BlockingQueue<Message<?>> messages = this.collector.forChannel(this.output);
processor.input().subscribe(message -> Assertions.assertEquals(message,"Hello RocketMQ" ));
assertThat(messages, is("Hello RocketMQ"));
}
@Configuration
@EnableAutoConfiguration
@ImportAutoConfiguration({ AutoServiceRegistrationConfiguration.class,
RocketMQBinderAutoConfiguration.class })
@ImportAutoConfiguration(value = { AutoServiceRegistrationConfiguration.class,
RocketMQBinderAutoConfiguration.class}, exclude = {
DataSourceAutoConfiguration.class,
TransactionAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class})
public static class TestConfig {
}

Loading…
Cancel
Save