diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
index db65628f2..915160507 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java
@@ -16,11 +16,20 @@
package com.alibaba.cloud.stream.binder.rocketmq;
-import java.util.HashMap;
-import java.util.Map;
-
+import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
+import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
+import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
+import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
+import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -29,11 +38,7 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
-import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
-import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
-import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
-import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
-import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
+import org.springframework.cloud.stream.binder.*;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
@@ -47,18 +52,7 @@ import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
-import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer;
-import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter;
-import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler;
-import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource;
-import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties;
-import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner;
-import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.*;
/**
* @author Jim
@@ -88,7 +82,7 @@ public class RocketMQMessageChannelBinder extends
this.instrumentationManager = instrumentationManager;
}
- @Override
+ @Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties producerProperties,
MessageChannel channel,
@@ -161,6 +155,8 @@ public class RocketMQMessageChannelBinder extends
}
}
+
+
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler(
rocketMQTemplate, destination.getName(), producerGroup,
producerProperties.getExtension().getTransactional(),
@@ -171,7 +167,7 @@ public class RocketMQMessageChannelBinder extends
.findFirst().orElse(null));
messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory());
messageHandler.setSync(producerProperties.getExtension().getSync());
-
+ messageHandler.setHeaderMapper(createHeaderMapper(producerProperties));
if (errorChannel != null) {
messageHandler.setSendFailureChannel(errorChannel);
}
@@ -212,9 +208,7 @@ public class RocketMQMessageChannelBinder extends
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
listenerContainer
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
- RocketMQHeaderMapper headerMapper=new JacksonRocketMQHeaderMapper(this.getApplicationContext()
- .getBeansOfType(ObjectMapper.class).values().iterator().next());
- listenerContainer.setHeaderMapper(headerMapper);
+ listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
listenerContainer, consumerProperties, instrumentationManager);
@@ -299,4 +293,26 @@ public class RocketMQMessageChannelBinder extends
this.extendedBindingProperties = extendedBindingProperties;
}
+ private RocketMQHeaderMapper createHeaderMapper(
+ final ExtendedConsumerProperties extendedConsumerProperties) {
+ Set trustedPackages = extendedConsumerProperties.getExtension()
+ .getTrustedPackages();
+ return createHeaderMapper(trustedPackages);
+ }
+
+ private RocketMQHeaderMapper createHeaderMapper(
+ final ExtendedProducerProperties producerProperties) {
+ return createHeaderMapper(Collections.emptyList());
+ }
+
+ private RocketMQHeaderMapper createHeaderMapper(Collection trustedPackages){
+ ObjectMapper objectMapper=this.getApplicationContext()
+ .getBeansOfType(ObjectMapper.class).values().iterator().next();
+ JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(objectMapper);
+ if (!StringUtils.isEmpty(trustedPackages)) {
+ headerMapper.addTrustedPackages(trustedPackages);
+ }
+ return headerMapper;
+ }
+
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
index 27be39a24..d0e166047 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java
@@ -444,16 +444,15 @@ public class RocketMQListenerBindingContainer
* @param messageExt the rocketmq message
* @return the converted Spring {@link Message}
*/
- private Message convertToSpringMessage(MessageExt messageExt) {
+ @SuppressWarnings("unchecked")
+ private Message convertToSpringMessage(MessageExt messageExt) {
// add reconsume-times header to messageExt
int reconsumeTimes = messageExt.getReconsumeTimes();
messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES,
String.valueOf(reconsumeTimes));
Message message=RocketMQUtil.convertToSpringMessage(messageExt);
- Map afterMapperHeaders= Maps.newHashMap();
- headerMapper.toHeaders(messageExt.getProperties(),afterMapperHeaders);
- return MessageBuilder.fromMessage(message).copyHeaders(afterMapperHeaders).build();
+ return MessageBuilder.fromMessage(message).copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build();
}
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
index 726064220..f778964bb 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java
@@ -67,7 +67,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
private final RocketMQTemplate rocketMQTemplate;
- private final RocketMQHeaderMapper headerMapper;
+ private RocketMQHeaderMapper headerMapper;
private final Boolean transactional;
@@ -97,8 +97,6 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
this.instrumentationManager = instrumentationManager;
this.producerProperties = producerProperties;
this.partitioningInterceptor = partitioningInterceptor;
-
- this.headerMapper=new JacksonRocketMQHeaderMapper(rocketMQTemplate.getObjectMapper());
}
@Override
@@ -159,8 +157,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
throws Exception {
try {
//issue 737 fix
- Map jsonHeaders= Maps.newHashMap();
- headerMapper.fromHeaders(message.getHeaders(),jsonHeaders);
+ Map jsonHeaders=headerMapper.fromHeaders(message.getHeaders());
message = org.springframework.messaging.support.MessageBuilder
.fromMessage(message).copyHeaders(jsonHeaders)
.build();
@@ -296,4 +293,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li
public void setSync(boolean sync) {
this.sync = sync;
}
+
+ public RocketMQHeaderMapper getHeaderMapper() {
+ return headerMapper;
+ }
+
+ public void setHeaderMapper(RocketMQHeaderMapper headerMapper) {
+ this.headerMapper = headerMapper;
+ }
}
\ No newline at end of file
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
index 9554e580b..f8a4b39bd 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java
@@ -16,6 +16,7 @@
package com.alibaba.cloud.stream.binder.rocketmq.properties;
+import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -23,6 +24,9 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import java.util.List;
+import java.util.Set;
+
/**
* @author Timur Valiev
* @author Jim
@@ -65,6 +69,11 @@ public class RocketMQConsumerProperties {
private Boolean enabled = true;
+ /**
+ * {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)}
+ */
+ private Set trustedPackages;
+
// ------------ For Pull Consumer ------------
private long pullTimeout = 10 * 1000;
@@ -148,4 +157,12 @@ public class RocketMQConsumerProperties {
public boolean shouldRequeue() {
return delayLevelWhenNextConsume != -1;
}
+
+ public Set getTrustedPackages() {
+ return trustedPackages;
+ }
+
+ public void setTrustedPackages(Set trustedPackages) {
+ this.trustedPackages = trustedPackages;
+ }
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java
index a03276d43..c3d1d83c8 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java
@@ -52,7 +52,8 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
}
@Override
- public void fromHeaders(Map headers, Map target) {
+ public Map fromHeaders(MessageHeaders headers) {
+ final Map target = Maps.newHashMap();
final Map jsonHeaders = Maps.newHashMap();
headers.forEach((key, value) -> {
if (matches(key)) {
@@ -78,10 +79,12 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
log.error( "Could not add json types header",e);
}
}
+ return target;
}
@Override
- public void toHeaders(Map source, Map target) {
+ public MessageHeaders toHeaders(Map source) {
+ final Map target = Maps.newHashMap();
final Map jsonTypes = decodeJsonTypes(source);
source.forEach((key,value) -> {
if (!(key.equals(JSON_TYPES))) {
@@ -116,6 +119,17 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
}
}
});
+ return new MessageHeaders(target);
+ }
+
+ /**
+ * @param packagesToTrust the packages to trust.
+ * @see #addTrustedPackages(Collection)
+ */
+ public void addTrustedPackages(String... packagesToTrust) {
+ if(Objects.nonNull(packagesToTrust)){
+ addTrustedPackages(Arrays.asList(packagesToTrust));
+ }
}
/**
@@ -126,7 +140,7 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
* application with value of type {@link NonTrustedHeaderType}.
* @param packagesToTrust the packages to trust.
*/
- public void addTrustedPackages(String... packagesToTrust) {
+ public void addTrustedPackages(Collection packagesToTrust) {
if (packagesToTrust != null) {
for (String whiteList : packagesToTrust) {
if ("*".equals(whiteList)) {
@@ -140,7 +154,6 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{
}
}
-
public Set getTrustedPackages() {
return this.trustedPackages;
}
diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java
index 0f2f09dce..d34af0eac 100644
--- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java
+++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java
@@ -12,8 +12,16 @@ import java.util.Map;
* @since 2.1.1
*/
public interface RocketMQHeaderMapper {
-
- void fromHeaders(Map headers, Map target);
-
- void toHeaders(Map source, Map target);
+ /**
+ * Map from the given {@link MessageHeaders} to the specified target message.
+ * @param headers the abstracted MessageHeaders.
+ * @return the native target message.
+ */
+ Map fromHeaders(MessageHeaders headers);
+ /**
+ * Map from the given target message to abstracted {@link MessageHeaders}.
+ * @param source the native target message.
+ * @return the target headers.
+ */
+ MessageHeaders toHeaders(Map source);
}