@ -16,14 +16,6 @@
package com.alibaba.cloud.stream.binder.rocketmq ;
import java.nio.charset.StandardCharsets ;
import java.util.HashMap ;
import java.util.Map ;
import java.util.concurrent.BlockingQueue ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentMap ;
import java.util.concurrent.LinkedBlockingDeque ;
import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache ;
import com.alibaba.cloud.stream.binder.rocketmq.extend.ErrorAcknowledgeHandler ;
import com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter ;
@ -35,7 +27,6 @@ import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerPrope
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties ;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties ;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner ;
import com.alibaba.cloud.stream.binder.rocketmq.support.MessageCollector ;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils ;
import org.apache.rocketmq.common.protocol.NamespaceUtil ;
@ -45,8 +36,6 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties ;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder ;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer ;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory ;
import org.springframework.cloud.stream.converter.MessageConverterUtils ;
import org.springframework.cloud.stream.provisioning.ConsumerDestination ;
import org.springframework.cloud.stream.provisioning.ProducerDestination ;
import org.springframework.integration.StaticMessageHeaderAccessor ;
@ -55,18 +44,9 @@ import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.core.MessageProducer ;
import org.springframework.integration.support.DefaultErrorMessageStrategy ;
import org.springframework.integration.support.ErrorMessageStrategy ;
import org.springframework.messaging.Message ;
import org.springframework.messaging.MessageChannel ;
import org.springframework.messaging.MessageHandler ;
import org.springframework.messaging.MessagingException ;
import org.springframework.messaging.SubscribableChannel ;
import org.springframework.messaging.converter.DefaultContentTypeResolver ;
import org.springframework.messaging.converter.MessageConverter ;
import org.springframework.messaging.support.ChannelInterceptor ;
import org.springframework.messaging.support.MessageBuilder ;
import org.springframework.util.Assert ;
import org.springframework.util.MimeType ;
import org.springframework.util.MimeTypeUtils ;
import org.springframework.util.StringUtils ;
@ -85,9 +65,6 @@ public class RocketMQMessageChannelBinder extends
private final RocketMQBinderConfigurationProperties binderConfigurationProperties ;
private final MessageCollectorImpl messageCollector = new MessageCollectorImpl ( ) ;
private final ConcurrentMap < String , MessageChannel > messageChannels = new ConcurrentHashMap ( ) ;
public RocketMQMessageChannelBinder (
RocketMQBinderConfigurationProperties binderConfigurationProperties ,
RocketMQExtendedBindingProperties extendedBindingProperties ,
@ -122,15 +99,9 @@ public class RocketMQMessageChannelBinder extends
messageHandler . setPartitioningInterceptor ( partitioningInterceptor ) ;
messageHandler . setBeanFactory ( this . getApplicationContext ( ) . getBeanFactory ( ) ) ;
messageHandler . setErrorMessageStrategy ( this . getErrorMessageStrategy ( ) ) ;
final BlockingQueue < Message < ? > > queue = this . messageCollector . register ( channel , extendedProducerProperties . isUseNativeEncoding ( ) ) ;
( ( SubscribableChannel ) channel ) . subscribe ( queue : : add ) ;
this . messageChannels . put ( destination . getName ( ) , channel ) ;
return messageHandler ;
}
@Override
protected MessageHandler createProducerMessageHandler ( ProducerDestination destination ,
ExtendedProducerProperties < RocketMQProducerProperties > producerProperties ,
@ -240,85 +211,4 @@ public class RocketMQMessageChannelBinder extends
public Class < ? extends BinderSpecificPropertiesProvider > getExtendedPropertiesEntryClass ( ) {
return this . extendedBindingProperties . getExtendedPropertiesEntryClass ( ) ;
}
private static final class InboundMessageConvertingInterceptor implements ChannelInterceptor {
private final DefaultContentTypeResolver contentTypeResolver ;
private final CompositeMessageConverterFactory converterFactory ;
private InboundMessageConvertingInterceptor ( ) {
this . contentTypeResolver = new DefaultContentTypeResolver ( ) ;
this . converterFactory = new CompositeMessageConverterFactory ( ) ;
}
private static boolean equalTypeAndSubType ( MimeType m1 , MimeType m2 ) {
return m1 ! = null & & m2 ! = null & & m1 . getType ( ) . equalsIgnoreCase ( m2 . getType ( ) ) & & m1 . getSubtype ( ) . equalsIgnoreCase ( m2 . getSubtype ( ) ) ;
}
public Message < ? > preSend ( Message < ? > message , MessageChannel channel ) {
Class < ? > targetClass = null ;
MessageConverter converter = null ;
MimeType contentType = MimeType . valueOf ( this . contentTypeResolver . resolve ( message . getHeaders ( ) ) . toString ( ) ) ;
if ( contentType ! = null & & ( equalTypeAndSubType ( MessageConverterUtils . X_JAVA_SERIALIZED_OBJECT , contentType ) | | equalTypeAndSubType ( MessageConverterUtils . X_JAVA_OBJECT , contentType ) ) ) {
message = MessageBuilder . fromMessage ( message ) . setHeader ( "contentType" , contentType ) . build ( ) ;
converter = equalTypeAndSubType ( MessageConverterUtils . X_JAVA_SERIALIZED_OBJECT , contentType ) ? this . converterFactory . getMessageConverterForType ( contentType ) : this . converterFactory . getMessageConverterForAllRegistered ( ) ;
String targetClassName = contentType . getParameter ( "type" ) ;
if ( StringUtils . hasText ( targetClassName ) ) {
try {
targetClass = Class . forName ( targetClassName , false , Thread . currentThread ( ) . getContextClassLoader ( ) ) ;
} catch ( Exception var8 ) {
throw new IllegalStateException ( "Failed to determine class name for contentType: " + message . getHeaders ( ) , var8 ) ;
}
}
}
Object payload ;
if ( converter ! = null ) {
Assert . isTrue ( ! equalTypeAndSubType ( MessageConverterUtils . X_JAVA_OBJECT , contentType ) | | targetClass ! = null , "Cannot deserialize into message since 'contentType` is not encoded with the actual target type.Consider 'application/x-java-object; type=foo.bar.MyClass'" ) ;
payload = converter . fromMessage ( message , targetClass ) ;
} else {
MimeType deserializeContentType = this . contentTypeResolver . resolve ( message . getHeaders ( ) ) ;
if ( deserializeContentType = = null ) {
deserializeContentType = contentType ;
}
payload = deserializeContentType = = null ? message . getPayload ( ) : this . deserializePayload ( message . getPayload ( ) , deserializeContentType ) ;
}
message = MessageBuilder . withPayload ( payload ) . copyHeaders ( message . getHeaders ( ) ) . setHeader ( "contentType" , contentType ) . build ( ) ;
return message ;
}
private Object deserializePayload ( Object payload , MimeType contentType ) {
if ( payload instanceof byte [ ] & & ( "text" . equalsIgnoreCase ( contentType . getType ( ) ) | | equalTypeAndSubType ( MimeTypeUtils . APPLICATION_JSON , contentType ) ) ) {
payload = new String ( ( byte [ ] ) ( ( byte [ ] ) payload ) , StandardCharsets . UTF_8 ) ;
}
return payload ;
}
}
protected static class MessageCollectorImpl implements MessageCollector {
private final Map < MessageChannel , BlockingQueue < Message < ? > > > results ;
MessageCollectorImpl ( ) {
this . results = new HashMap ( ) ;
}
private BlockingQueue < Message < ? > > register ( MessageChannel channel , boolean useNativeEncoding ) {
if ( ! useNativeEncoding ) {
( ( AbstractMessageChannel ) channel ) . addInterceptor ( new InboundMessageConvertingInterceptor ( ) ) ;
}
LinkedBlockingDeque < Message < ? > > result = new LinkedBlockingDeque ( ) ;
Assert . isTrue ( ! this . results . containsKey ( channel ) , "Channel [" + channel + "] was already bound" ) ;
this . results . put ( channel , result ) ;
return result ;
}
public BlockingQueue < Message < ? > > forChannel ( MessageChannel channel ) {
BlockingQueue < Message < ? > > queue = this . results . get ( channel ) ;
Assert . notNull ( queue , "Channel [" + channel + "] was not bound by " + RocketMQMessageChannelBinder . class ) ;
return queue ;
}
}
}