From 68bb568ee450dc7c6bb44c5c0a56b5eea99d8009 Mon Sep 17 00:00:00 2001 From: windwheel Date: Sat, 27 Aug 2022 22:11:31 +0800 Subject: [PATCH] add channel outout collector --- .../RocketMQMessageChannelBinder.java | 103 ++++++++++++++++++ .../rocketmq/support/MessageCollector.java | 10 ++ .../RocketmqProduceAndConsumerTests.java | 59 +++++----- 3 files changed, 145 insertions(+), 27 deletions(-) create mode 100644 spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/MessageCollector.java diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index f83eac76f..781a1896e 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,6 +16,12 @@ package com.alibaba.cloud.stream.binder.rocketmq; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler; import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter; @@ -27,6 +33,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import com.alibaba.cloud.stream.binder.rocketmq.support.MessageCollector; import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; import org.apache.rocketmq.common.protocol.NamespaceUtil; @@ -36,6 +43,8 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; import org.springframework.cloud.stream.binding.MessageConverterConfigurer; +import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; +import org.springframework.cloud.stream.converter.MessageConverterUtils; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.integration.StaticMessageHeaderAccessor; @@ -44,9 +53,17 @@ import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.support.DefaultErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageStrategy; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; +import org.springframework.messaging.converter.DefaultContentTypeResolver; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; import org.springframework.util.StringUtils; @@ -211,4 +228,90 @@ public class RocketMQMessageChannelBinder extends public Class getExtendedPropertiesEntryClass() { return this.extendedBindingProperties.getExtendedPropertiesEntryClass(); } + + private static final class InboundMessageConvertingInterceptor implements ChannelInterceptor { + private final DefaultContentTypeResolver contentTypeResolver; + private final CompositeMessageConverterFactory converterFactory; + + private InboundMessageConvertingInterceptor() { + this.contentTypeResolver = new DefaultContentTypeResolver(); + this.converterFactory = new CompositeMessageConverterFactory(); + } + + private static boolean equalTypeAndSubType(MimeType m1, MimeType m2) { + return m1 != null && m2 != null && m1.getType().equalsIgnoreCase(m2.getType()) && m1.getSubtype().equalsIgnoreCase(m2.getSubtype()); + } + + public Message preSend(Message message, MessageChannel channel) { + Class targetClass = null; + MessageConverter converter = null; + MimeType contentType = MimeType.valueOf(this.contentTypeResolver.resolve(message.getHeaders()).toString()); + if (contentType != null && (equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentType) || equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentType))) { + message = MessageBuilder.fromMessage(message).setHeader("contentType", contentType).build(); + converter = equalTypeAndSubType(MessageConverterUtils.X_JAVA_SERIALIZED_OBJECT, contentType) ? this.converterFactory.getMessageConverterForType(contentType) : this.converterFactory.getMessageConverterForAllRegistered(); + String targetClassName = contentType.getParameter("type"); + if (StringUtils.hasText(targetClassName)) { + try { + targetClass = Class.forName(targetClassName, false, Thread.currentThread().getContextClassLoader()); + } catch (Exception var8) { + throw new IllegalStateException("Failed to determine class name for contentType: " + message.getHeaders(), var8); + } + } + } + + Object payload; + if (converter != null) { + Assert.isTrue(!equalTypeAndSubType(MessageConverterUtils.X_JAVA_OBJECT, contentType) || targetClass != null, "Cannot deserialize into message since 'contentType` is not encoded with the actual target type.Consider 'application/x-java-object; type=foo.bar.MyClass'"); + payload = converter.fromMessage(message, targetClass); + } else { + MimeType deserializeContentType = this.contentTypeResolver.resolve(message.getHeaders()); + if (deserializeContentType == null) { + deserializeContentType = contentType; + } + + payload = deserializeContentType == null ? message.getPayload() : this.deserializePayload(message.getPayload(), deserializeContentType); + } + + message = MessageBuilder.withPayload(payload).copyHeaders(message.getHeaders()).setHeader("contentType", contentType).build(); + return message; + } + + private Object deserializePayload(Object payload, MimeType contentType) { + if (payload instanceof byte[] && ("text".equalsIgnoreCase(contentType.getType()) || equalTypeAndSubType(MimeTypeUtils.APPLICATION_JSON, contentType))) { + payload = new String((byte[])((byte[])payload), StandardCharsets.UTF_8); + } + + return payload; + } + } + + + private static class MessageCollectorImpl implements MessageCollector { + private final Map>> results; + + private MessageCollectorImpl() { + this.results = new HashMap(); + } + + private BlockingQueue> register(MessageChannel channel, boolean useNativeEncoding) { + if (!useNativeEncoding) { + ((AbstractMessageChannel)channel).addInterceptor(new InboundMessageConvertingInterceptor()); + } + + LinkedBlockingDeque> result = new LinkedBlockingDeque(); + Assert.isTrue(!this.results.containsKey(channel), "Channel [" + channel + "] was already bound"); + this.results.put(channel, result); + return result; + } + + private void unregister(MessageChannel channel) { + Assert.notNull(this.results.remove(channel), "Trying to unregister a mapping for an unknown channel [" + channel + "]"); + } + + public BlockingQueue> forChannel(MessageChannel channel) { + BlockingQueue> queue = (BlockingQueue)this.results.get(channel); + Assert.notNull(queue, "Channel [" + channel + "] was not bound by " + RocketMQMessageChannelBinder.class); + return queue; + } + } } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/MessageCollector.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/MessageCollector.java new file mode 100644 index 000000000..d94ac5330 --- /dev/null +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/MessageCollector.java @@ -0,0 +1,10 @@ +package com.alibaba.cloud.stream.binder.rocketmq.support; + +import java.util.concurrent.BlockingQueue; + +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; + +public interface MessageCollector { + BlockingQueue> forChannel(MessageChannel channel); +} diff --git a/spring-cloud-alibaba-tests/rocketmq-tests/rocketmq-stream-test/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketmqProduceAndConsumerTests.java b/spring-cloud-alibaba-tests/rocketmq-tests/rocketmq-stream-test/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketmqProduceAndConsumerTests.java index 212241a5e..421eb4d76 100644 --- a/spring-cloud-alibaba-tests/rocketmq-tests/rocketmq-stream-test/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketmqProduceAndConsumerTests.java +++ b/spring-cloud-alibaba-tests/rocketmq-tests/rocketmq-stream-test/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketmqProduceAndConsumerTests.java @@ -2,57 +2,50 @@ package com.alibaba.cloud.stream.binder.rocketmq; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.BlockingQueue; import com.alibaba.cloud.rocketmq.SimpleMsg; import com.alibaba.cloud.stream.binder.rocketmq.autoconfigurate.RocketMQBinderAutoConfiguration; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.support.MessageCollector; import com.alibaba.cloud.testsupport.SpringCloudAlibaba; import com.alibaba.cloud.testsupport.TestExtend; import org.apache.rocketmq.common.message.MessageConst; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.platform.commons.util.StringUtils; import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationRunner; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.ImportAutoConfiguration; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration; +import org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationConfiguration; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.binder.PollableMessageSource; import org.springframework.cloud.stream.messaging.Processor; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.cloud.stream.messaging.Source; -import org.springframework.context.annotation.Bean; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; -import org.springframework.context.annotation.Profile; -import org.springframework.core.ParameterizedTypeReference; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.GenericMessage; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit4.SpringRunner; import static com.alibaba.cloud.testsupport.Constant.TIME_OUT; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.NONE; @SpringCloudAlibaba(composeFiles = "docker/rocket-compose-test.yml", serviceName = "rocketmq-standalone") -@TestExtend(time = 10 * TIME_OUT) -@RunWith(SpringRunner.class) -@EnableBinding({Processor.class, RocketmqProduceAndConsumerTests.PolledProcessor.class}) -@DirtiesContext +@TestExtend(time = 6 * TIME_OUT) +//@EnableBinding({Processor.class, RocketmqProduceAndConsumerTests.PolledProcessor.class}) @SpringBootTest(classes = RocketmqProduceAndConsumerTests.TestConfig.class, webEnvironment = NONE, properties = { "spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876,127.0.0.1:9877", "spring.cloud.stream.rocketmq.binder.group=flaky-group", @@ -64,15 +57,24 @@ import static org.springframework.boot.test.context.SpringBootTest.WebEnvironmen "spring.cloud.stream.bindings.input1.destination=TopicOrderTest", "spring.cloud.stream.bindings.input1.content-type=application/json", "spring.cloud.stream.bindings.input1.group=test-group1", - "spring.cloud.stream.rocketmq.bindings.input1.consumer.push.orderly=true", + "spring.cloud.stream.bindings.input1.consumer.push.orderly=true", "spring.cloud.stream.bindings.input1.consumer.maxAttempts=1",}) public class RocketmqProduceAndConsumerTests { + + @Autowired + private MessageCollector collector; + + @Autowired + @Qualifier("input1") + private MessageChannel input1; @Autowired - private PolledProcessor processor; + @Qualifier("output") + private MessageChannel output; @BeforeAll public static void prepare(){ + } @BeforeEach @@ -84,20 +86,23 @@ public class RocketmqProduceAndConsumerTests { headers.put(MessageConst.PROPERTY_TAGS, "TagA"); headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, messageId); Message msg = new GenericMessage(new SimpleMsg("Hello RocketMQ"), headers); - processor.output().send(msg); + input1.send(msg); } @Test - @StreamListener(PolledProcessor.CUSTOMIZE_OUTPUT) - public void testConsumeAndProduce() throws InterruptedException { + public void testConsumeAndProduce() throws Exception { + BlockingQueue> messages = this.collector.forChannel(this.output); - processor.input().subscribe(message -> Assertions.assertEquals(message,"Hello RocketMQ" )); + assertThat(messages, is("Hello RocketMQ")); } @Configuration @EnableAutoConfiguration - @ImportAutoConfiguration({ AutoServiceRegistrationConfiguration.class, - RocketMQBinderAutoConfiguration.class }) + @ImportAutoConfiguration(value = { AutoServiceRegistrationConfiguration.class, + RocketMQBinderAutoConfiguration.class}, exclude = { + DataSourceAutoConfiguration.class, + TransactionAutoConfiguration.class, + DataSourceTransactionManagerAutoConfiguration.class}) public static class TestConfig { }