将headerMapper创建方法统一在Binder中

Signed-off-by: caotc <250622148@qq.com>
pull/880/head
caotc 6 years ago
parent 818a139696
commit 3f9b8a6612

@ -16,11 +16,20 @@
package com.alibaba.cloud.stream.binder.rocketmq; package com.alibaba.cloud.stream.binder.rocketmq;
import java.util.HashMap; import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
import java.util.Map; import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
@ -29,11 +38,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil; import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer; import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination;
@ -47,18 +52,7 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; import java.util.*;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.fasterxml.jackson.databind.ObjectMapper;
/** /**
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -161,6 +155,8 @@ public class RocketMQMessageChannelBinder extends
} }
} }
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
rocketMQTemplate, destination.getName(), producerGroup, rocketMQTemplate, destination.getName(), producerGroup,
producerProperties.getExtension().getTransactional(), producerProperties.getExtension().getTransactional(),
@ -171,7 +167,7 @@ public class RocketMQMessageChannelBinder extends
.findFirst().orElse(null)); .findFirst().orElse(null));
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
messageHandler.setSync(producerProperties.getExtension().getSync()); messageHandler.setSync(producerProperties.getExtension().getSync());
messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
if (errorChannel != null) { if (errorChannel != null) {
messageHandler.setSendFailureChannel(errorChannel); messageHandler.setSendFailureChannel(errorChannel);
} }
@ -212,9 +208,7 @@ public class RocketMQMessageChannelBinder extends
consumerProperties.getExtension().getDelayLevelWhenNextConsume()); consumerProperties.getExtension().getDelayLevelWhenNextConsume());
listenerContainer listenerContainer
.setNameServer(rocketBinderConfigurationProperties.getNameServer()); .setNameServer(rocketBinderConfigurationProperties.getNameServer());
RocketMQHeaderMapper headerMapper=new JacksonRocketMQHeaderMapper(this.getApplicationContext() listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));
.getBeansOfType(ObjectMapper.class).values().iterator().next());
listenerContainer.setHeaderMapper(headerMapper);
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
listenerContainer, consumerProperties, instrumentationManager); listenerContainer, consumerProperties, instrumentationManager);
@ -299,4 +293,26 @@ public class RocketMQMessageChannelBinder extends
this.extendedBindingProperties = extendedBindingProperties; this.extendedBindingProperties = extendedBindingProperties;
} }
private RocketMQHeaderMapper createHeaderMapper(
final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
Set<String> trustedPackages = extendedConsumerProperties.getExtension()
.getTrustedPackages();
return createHeaderMapper(trustedPackages);
}
private RocketMQHeaderMapper createHeaderMapper(
final ExtendedProducerProperties<RocketMQProducerProperties> producerProperties) {
return createHeaderMapper(Collections.emptyList());
}
private RocketMQHeaderMapper createHeaderMapper(Collection<String> trustedPackages){
ObjectMapper objectMapper=this.getApplicationContext()
.getBeansOfType(ObjectMapper.class).values().iterator().next();
JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(objectMapper);
if (!StringUtils.isEmpty(trustedPackages)) {
headerMapper.addTrustedPackages(trustedPackages);
}
return headerMapper;
}
} }

@ -444,6 +444,7 @@ public class RocketMQListenerBindingContainer
* @param messageExt the rocketmq message * @param messageExt the rocketmq message
* @return the converted Spring {@link Message} * @return the converted Spring {@link Message}
*/ */
@SuppressWarnings("unchecked")
private Message convertToSpringMessage(MessageExt messageExt) { private Message convertToSpringMessage(MessageExt messageExt) {
// add reconsume-times header to messageExt // add reconsume-times header to messageExt
@ -451,9 +452,7 @@ public class RocketMQListenerBindingContainer
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
String.valueOf(reconsumeTimes)); String.valueOf(reconsumeTimes));
Message message=RocketMQUtil.convertToSpringMessage(messageExt); Message message=RocketMQUtil.convertToSpringMessage(messageExt);
Map<String,Object> afterMapperHeaders= Maps.newHashMap(); return MessageBuilder.fromMessage(message).copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
headerMapper.toHeaders(messageExt.getProperties(),afterMapperHeaders);
return MessageBuilder.fromMessage(message).copyHeaders(afterMapperHeaders).build();
} }
} }

@ -67,7 +67,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private final RocketMQTemplate rocketMQTemplate; private final RocketMQTemplate rocketMQTemplate;
private final RocketMQHeaderMapper headerMapper; private RocketMQHeaderMapper headerMapper;
private final Boolean transactional; private final Boolean transactional;
@ -97,8 +97,6 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
this.instrumentationManager = instrumentationManager; this.instrumentationManager = instrumentationManager;
this.producerProperties = producerProperties; this.producerProperties = producerProperties;
this.partitioningInterceptor = partitioningInterceptor; this.partitioningInterceptor = partitioningInterceptor;
this.headerMapper=new JacksonRocketMQHeaderMapper(rocketMQTemplate.getObjectMapper());
} }
@Override @Override
@ -159,8 +157,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
throws Exception { throws Exception {
try { try {
//issue 737 fix //issue 737 fix
Map<String,String> jsonHeaders= Maps.newHashMap(); Map<String,String> jsonHeaders=headerMapper.fromHeaders(message.getHeaders());
headerMapper.fromHeaders(message.getHeaders(),jsonHeaders);
message = org.springframework.messaging.support.MessageBuilder message = org.springframework.messaging.support.MessageBuilder
.fromMessage(message).copyHeaders(jsonHeaders) .fromMessage(message).copyHeaders(jsonHeaders)
.build(); .build();
@ -296,4 +293,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
public void setSync(boolean sync) { public void setSync(boolean sync) {
this.sync = sync; this.sync = sync;
} }
public RocketMQHeaderMapper getHeaderMapper() {
return headerMapper;
}
public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}
} }

@ -16,6 +16,7 @@
package com.alibaba.cloud.stream.binder.rocketmq.properties; package com.alibaba.cloud.stream.binder.rocketmq.properties;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@ -23,6 +24,9 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
import java.util.Set;
/** /**
* @author Timur Valiev * @author Timur Valiev
* @author <a href="mailto:fangjian0423@gmail.com">Jim</a> * @author <a href="mailto:fangjian0423@gmail.com">Jim</a>
@ -65,6 +69,11 @@ public class RocketMQConsumerProperties {
private Boolean enabled = true; private Boolean enabled = true;
/**
* {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)}
*/
private Set<String> trustedPackages;
// ------------ For Pull Consumer ------------ // ------------ For Pull Consumer ------------
private long pullTimeout = 10 * 1000; private long pullTimeout = 10 * 1000;
@ -148,4 +157,12 @@ public class RocketMQConsumerProperties {
public boolean shouldRequeue() { public boolean shouldRequeue() {
return delayLevelWhenNextConsume != -1; return delayLevelWhenNextConsume != -1;
} }
public Set<String> getTrustedPackages() {
return trustedPackages;
}
public void setTrustedPackages(Set<String> trustedPackages) {
this.trustedPackages = trustedPackages;
}
} }

@ -52,7 +52,8 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
} }
@Override @Override
public void fromHeaders(Map<String,Object> headers, Map<String, String> target) { public Map<String,String> fromHeaders(MessageHeaders headers) {
final Map<String, String> target = Maps.newHashMap();
final Map<String, String> jsonHeaders = Maps.newHashMap(); final Map<String, String> jsonHeaders = Maps.newHashMap();
headers.forEach((key, value) -> { headers.forEach((key, value) -> {
if (matches(key)) { if (matches(key)) {
@ -78,10 +79,12 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
log.error( "Could not add json types header",e); log.error( "Could not add json types header",e);
} }
} }
return target;
} }
@Override @Override
public void toHeaders(Map<String, String> source, Map<String,Object> target) { public MessageHeaders toHeaders(Map<String,String> source) {
final Map<String, Object> target = Maps.newHashMap();
final Map<String, String> jsonTypes = decodeJsonTypes(source); final Map<String, String> jsonTypes = decodeJsonTypes(source);
source.forEach((key,value) -> { source.forEach((key,value) -> {
if (!(key.equals(JSON_TYPES))) { if (!(key.equals(JSON_TYPES))) {
@ -116,6 +119,17 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
} }
} }
}); });
return new MessageHeaders(target);
}
/**
* @param packagesToTrust the packages to trust.
* @see #addTrustedPackages(Collection)
*/
public void addTrustedPackages(String... packagesToTrust) {
if(Objects.nonNull(packagesToTrust)){
addTrustedPackages(Arrays.asList(packagesToTrust));
}
} }
/** /**
@ -126,7 +140,7 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
* application with value of type {@link NonTrustedHeaderType}. * application with value of type {@link NonTrustedHeaderType}.
* @param packagesToTrust the packages to trust. * @param packagesToTrust the packages to trust.
*/ */
public void addTrustedPackages(String... packagesToTrust) { public void addTrustedPackages(Collection<String> packagesToTrust) {
if (packagesToTrust != null) { if (packagesToTrust != null) {
for (String whiteList : packagesToTrust) { for (String whiteList : packagesToTrust) {
if ("*".equals(whiteList)) { if ("*".equals(whiteList)) {
@ -140,7 +154,6 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
} }
} }
public Set<String> getTrustedPackages() { public Set<String> getTrustedPackages() {
return this.trustedPackages; return this.trustedPackages;
} }

@ -12,8 +12,16 @@ import java.util.Map;
* @since 2.1.1 * @since 2.1.1
*/ */
public interface RocketMQHeaderMapper { public interface RocketMQHeaderMapper {
/**
void fromHeaders(Map<String,Object> headers, Map<String,String> target); * Map from the given {@link MessageHeaders} to the specified target message.
* @param headers the abstracted MessageHeaders.
void toHeaders(Map<String,String> source, Map<String,Object> target); * @return the native target message.
*/
Map<String,String> fromHeaders(MessageHeaders headers);
/**
* Map from the given target message to abstracted {@link MessageHeaders}.
* @param source the native target message.
* @return the target headers.
*/
MessageHeaders toHeaders(Map<String,String> source);
} }

Loading…
Cancel
Save