From 818a139696f242db8c310be21058c0f04045663f Mon Sep 17 00:00:00 2001 From: caotc <250622148@qq.com> Date: Fri, 23 Aug 2019 18:15:44 +0800 Subject: [PATCH 1/8] =?UTF-8?q?issue737=20=E5=AE=9E=E7=8E=B0rocketmq?= =?UTF-8?q?=E7=9A=84header=E7=9A=84value=E7=9A=84=E6=97=A0=E6=84=9F?= =?UTF-8?q?=E7=9F=A5=E5=BA=8F=E5=88=97=E5=8C=96=E5=92=8C=E5=8F=8D=E5=BA=8F?= =?UTF-8?q?=E5=88=97=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: caotc <250622148@qq.com> --- .../RocketMQMessageChannelBinder.java | 5 + .../RocketMQListenerBindingContainer.java | 21 +- .../integration/RocketMQMessageHandler.java | 23 +- .../support/AbstractRocketMQHeaderMapper.java | 47 ++++ .../support/JacksonRocketMQHeaderMapper.java | 246 ++++++++++++++++++ .../support/RocketMQHeaderMapper.java | 19 ++ 6 files changed, 357 insertions(+), 4 deletions(-) create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java create mode 100644 spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index bc35de93e..db65628f2 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -19,6 +19,8 @@ package com.alibaba.cloud.stream.binder.rocketmq; import java.util.HashMap; import java.util.Map; +import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -210,6 +212,9 @@ public class RocketMQMessageChannelBinder extends consumerProperties.getExtension().getDelayLevelWhenNextConsume()); listenerContainer .setNameServer(rocketBinderConfigurationProperties.getNameServer()); + RocketMQHeaderMapper headerMapper=new JacksonRocketMQHeaderMapper(this.getApplicationContext() + .getBeansOfType(ObjectMapper.class).values().iterator().next()); + listenerContainer.setHeaderMapper(headerMapper); RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager); diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 5bf540e07..27be39a24 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -19,8 +19,11 @@ package com.alibaba.cloud.stream.binder.rocketmq.consuming; import static com.alibaba.cloud.stream.binder.rocketmq.RocketMQBinderConstants.ROCKETMQ_RECONSUME_TIMES; import java.util.List; +import java.util.Map; import java.util.Objects; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; +import com.google.common.collect.Maps; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -43,6 +46,8 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.context.SmartLifecycle; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.support.MutableMessageHeaders; import org.springframework.messaging.Message; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -88,6 +93,8 @@ public class RocketMQListenerBindingContainer private RocketMQListener rocketMQListener; + private RocketMQHeaderMapper headerMapper; + private DefaultMQPushConsumer consumer; private boolean running; @@ -369,6 +376,14 @@ public class RocketMQListenerBindingContainer return messageModel; } + public RocketMQHeaderMapper getHeaderMapper() { + return headerMapper; + } + + public void setHeaderMapper(RocketMQHeaderMapper headerMapper) { + this.headerMapper = headerMapper; + } + public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @@ -435,8 +450,10 @@ public class RocketMQListenerBindingContainer int reconsumeTimes = messageExt.getReconsumeTimes(); messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes)); - - return RocketMQUtil.convertToSpringMessage(messageExt); + Message message=RocketMQUtil.convertToSpringMessage(messageExt); + Map afterMapperHeaders= Maps.newHashMap(); + headerMapper.toHeaders(messageExt.getProperties(),afterMapperHeaders); + return MessageBuilder.fromMessage(message).copyHeaders(afterMapperHeaders).build(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 9186b064c..726064220 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -17,8 +17,12 @@ package com.alibaba.cloud.stream.binder.rocketmq.integration; import java.util.List; +import java.util.Map; import java.util.Optional; +import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; +import com.google.common.collect.Maps; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; @@ -37,6 +41,7 @@ import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.support.DefaultErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.ErrorMessage; @@ -62,6 +67,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private final RocketMQTemplate rocketMQTemplate; + private final RocketMQHeaderMapper headerMapper; + private final Boolean transactional; private final String destination; @@ -90,6 +97,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li this.instrumentationManager = instrumentationManager; this.producerProperties = producerProperties; this.partitioningInterceptor = partitioningInterceptor; + + this.headerMapper=new JacksonRocketMQHeaderMapper(rocketMQTemplate.getObjectMapper()); } @Override @@ -149,6 +158,14 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li protected void handleMessageInternal(org.springframework.messaging.Message message) throws Exception { try { + //issue 737 fix + Map jsonHeaders= Maps.newHashMap(); + headerMapper.fromHeaders(message.getHeaders(),jsonHeaders); + message = org.springframework.messaging.support.MessageBuilder + .fromMessage(message).copyHeaders(jsonHeaders) + .build(); + + final StringBuilder topicWithTags = new StringBuilder(destination); String tags = Optional .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("") @@ -156,6 +173,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li if (!StringUtils.isEmpty(tags)) { topicWithTags.append(":").append(tags); } + SendResult sendRes = null; if (transactional) { sendRes = rocketMQTemplate.sendMessageInTransaction(groupName, @@ -195,7 +213,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { - SendCallback sendCallback = new SendCallback() { + Message finalMessage = message; + SendCallback sendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.debug("async send to topic " + topicWithTags + " " @@ -210,7 +229,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li getSendFailureChannel().send( RocketMQMessageHandler.this.errorMessageStrategy .buildErrorMessage(new MessagingException( - message, e), null)); + finalMessage, e), null)); } } }; diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java new file mode 100644 index 000000000..e91b6e2f7 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java @@ -0,0 +1,47 @@ +package com.alibaba.cloud.stream.binder.rocketmq.support; + +import org.apache.rocketmq.common.message.MessageConst; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +/** +* Base for RocketMQ header mappers. +* +* @author caotc +* @date 2019-08-22 +* @since 2.1.1 +*/ +public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper{ + private static final Charset DEFAULT_CHARSET=StandardCharsets.UTF_8; + + private Charset charset; + + public AbstractRocketMQHeaderMapper() { + this(DEFAULT_CHARSET); + } + + public AbstractRocketMQHeaderMapper(Charset charset) { + Assert.notNull(charset, "'charset' cannot be null"); + this.charset = charset; + } + + protected boolean matches(String headerName) { + return !MessageConst.STRING_HASH_SET.contains(headerName) && !MessageHeaders.ID.equals(headerName) + && !MessageHeaders.TIMESTAMP.equals(headerName) && !MessageHeaders.CONTENT_TYPE.equals(headerName) + && !MessageHeaders.REPLY_CHANNEL.equals(headerName) && !MessageHeaders.ERROR_CHANNEL.equals(headerName); + } + + public Charset getCharset() { + return charset; + } + + public void setCharset(Charset charset) { + Assert.notNull(charset, "'charset' cannot be null"); + this.charset = charset; + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java new file mode 100644 index 000000000..a03276d43 --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java @@ -0,0 +1,246 @@ +package com.alibaba.cloud.stream.binder.rocketmq.support; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.lang.Nullable; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.ClassUtils; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.*; + +/** + * jackson header mapper for RocketMQ. + * Header types are added to a special header {@link #JSON_TYPES}. + * + * @author caotc + * @date 2019-08-22 + * @since 2.1.1 + */ +public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ + private final static Logger log = LoggerFactory + .getLogger(JacksonRocketMQHeaderMapper.class); + + private static final List DEFAULT_TRUSTED_PACKAGES = + Arrays.asList( + "java.lang", + "java.net", + "java.util", + "org.springframework.util" + ); + + /** + * Header name for java types of other headers. + */ + public static final String JSON_TYPES = "spring_json_header_types"; + + private final ObjectMapper objectMapper; + private final Set trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES); + + public JacksonRocketMQHeaderMapper(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public JacksonRocketMQHeaderMapper(Charset charset, ObjectMapper objectMapper) { + super(charset); + this.objectMapper = objectMapper; + } + + @Override + public void fromHeaders(Map headers, Map target) { + final Map jsonHeaders = Maps.newHashMap(); + headers.forEach((key, value) -> { + if (matches(key)) { + if (value instanceof String) { + target.put(key, (String) value); + }else { + try { + String className = value.getClass().getName(); + target.put(key, objectMapper.writeValueAsString(value)); + jsonHeaders.put(key, className); + } + catch (Exception e) { + log.debug("Could not map " + key + " with type " + value.getClass().getName(),e); + } + } + } + }); + if (jsonHeaders.size() > 0) { + try { + target.put(JSON_TYPES, objectMapper.writeValueAsString(jsonHeaders)); + } + catch (IllegalStateException | JsonProcessingException e) { + log.error( "Could not add json types header",e); + } + } + } + + @Override + public void toHeaders(Map source, Map target) { + final Map jsonTypes = decodeJsonTypes(source); + source.forEach((key,value) -> { + if (!(key.equals(JSON_TYPES))) { + if (jsonTypes != null && jsonTypes.containsKey(key)) { + Class type = Object.class; + String requestedType = jsonTypes.get(key); + boolean trusted = trusted(requestedType); + if (trusted) { + try { + type = ClassUtils.forName(requestedType, null); + }catch (Exception e) { + log.error( "Could not load class for header: " + key,e); + } + } + + if (trusted) { + try { + Object val = decodeValue(value, type); + target.put(key, val); + } + catch (IOException e) { + log.error("Could not decode json type: " + value + " for key: " + + key,e); + target.put(key, value); + } + }else { + target.put(key, new NonTrustedHeaderType(value, requestedType)); + } + } + else { + target.put(key, value); + } + } + }); + } + + /** + * Add packages to the trusted packages list (default {@code java.util, java.lang}) used + * when constructing objects from JSON. + * If any of the supplied packages is {@code "*"}, all packages are trusted. + * If a class for a non-trusted package is encountered, the header is returned to the + * application with value of type {@link NonTrustedHeaderType}. + * @param packagesToTrust the packages to trust. + */ + public void addTrustedPackages(String... packagesToTrust) { + if (packagesToTrust != null) { + for (String whiteList : packagesToTrust) { + if ("*".equals(whiteList)) { + this.trustedPackages.clear(); + break; + } + else { + this.trustedPackages.add(whiteList); + } + } + } + } + + + public Set getTrustedPackages() { + return this.trustedPackages; + } + + public ObjectMapper getObjectMapper() { + return objectMapper; + } + + private Object decodeValue(String jsonString, Class type) throws IOException, LinkageError { + Object value = objectMapper.readValue(jsonString, type); + if (type.equals(NonTrustedHeaderType.class)) { + // Upstream NTHT propagated; may be trusted here... + NonTrustedHeaderType nth = (NonTrustedHeaderType) value; + if (trusted(nth.getUntrustedType())) { + try { + value = objectMapper.readValue(nth.getHeaderValue(), + ClassUtils.forName(nth.getUntrustedType(), null)); + } + catch (Exception e) { + log.error("Could not decode header: " + nth,e); + } + } + } + return value; + } + + @Nullable + private Map decodeJsonTypes(Map source) { + if(source.containsKey(JSON_TYPES)){ + String value=source.get(JSON_TYPES); + try { + return objectMapper.readValue(value,new TypeReference>(){}); + } + catch (IOException e) { + log.error("Could not decode json types: " + value,e); + } + } + return null; + } + + protected boolean trusted(String requestedType) { + if (requestedType.equals(NonTrustedHeaderType.class.getName())) { + return true; + } + if (!this.trustedPackages.isEmpty()) { + int lastDot = requestedType.lastIndexOf('.'); + if (lastDot < 0) { + return false; + } + String packageName = requestedType.substring(0, lastDot); + for (String trustedPackage : this.trustedPackages) { + if (packageName.equals(trustedPackage) || packageName.startsWith(trustedPackage + ".")) { + return true; + } + } + return false; + } + return true; + } + + /** + * Represents a header that could not be decoded due to an untrusted type. + */ + public static class NonTrustedHeaderType { + + private String headerValue; + + private String untrustedType; + + public NonTrustedHeaderType() { + super(); + } + + NonTrustedHeaderType(String headerValue, String untrustedType) { + this.headerValue = headerValue; + this.untrustedType = untrustedType; + } + + + public void setHeaderValue(String headerValue) { + this.headerValue = headerValue; + } + + public String getHeaderValue() { + return this.headerValue; + } + + public void setUntrustedType(String untrustedType) { + this.untrustedType = untrustedType; + } + + public String getUntrustedType() { + return this.untrustedType; + } + + @Override + public String toString() { + return "NonTrustedHeaderType [headerValue=" + headerValue + + ", untrustedType=" + this.untrustedType + "]"; + } + + } +} diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java new file mode 100644 index 000000000..0f2f09dce --- /dev/null +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java @@ -0,0 +1,19 @@ +package com.alibaba.cloud.stream.binder.rocketmq.support; + +import org.springframework.messaging.MessageHeaders; + +import java.util.Map; + +/** +* header value mapper for RocketMQ +* +* @author caotc +* @date 2019-08-22 +* @since 2.1.1 +*/ +public interface RocketMQHeaderMapper { + + void fromHeaders(Map headers, Map target); + + void toHeaders(Map source, Map target); +} From 3f9b8a6612cb3341b0ea5f4aaa5c80b5c93a353d Mon Sep 17 00:00:00 2001 From: caotc <250622148@qq.com> Date: Mon, 26 Aug 2019 14:44:19 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E5=B0=86headerMapper=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E6=96=B9=E6=B3=95=E7=BB=9F=E4=B8=80=E5=9C=A8Binder=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: caotc <250622148@qq.com> --- .../RocketMQMessageChannelBinder.java | 66 ++++++++++++------- .../RocketMQListenerBindingContainer.java | 7 +- .../integration/RocketMQMessageHandler.java | 15 +++-- .../RocketMQConsumerProperties.java | 17 +++++ .../support/JacksonRocketMQHeaderMapper.java | 21 ++++-- .../support/RocketMQHeaderMapper.java | 16 +++-- 6 files changed, 100 insertions(+), 42 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index db65628f2..915160507 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,11 +16,20 @@ package com.alibaba.cloud.stream.binder.rocketmq; -import java.util.HashMap; -import java.util.Map; - +import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; +import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; +import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; +import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -29,11 +38,7 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQUtil; -import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder; -import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider; -import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; -import org.springframework.cloud.stream.binder.ExtendedProducerProperties; -import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; +import org.springframework.cloud.stream.binder.*; import org.springframework.cloud.stream.binding.MessageConverterConfigurer; import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; @@ -47,18 +52,7 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; -import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; -import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; -import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; -import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource; -import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; -import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; -import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.*; /** * @author Jim @@ -88,7 +82,7 @@ public class RocketMQMessageChannelBinder extends this.instrumentationManager = instrumentationManager; } - @Override + @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, MessageChannel channel, @@ -161,6 +155,8 @@ public class RocketMQMessageChannelBinder extends } } + + RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), @@ -171,7 +167,7 @@ public class RocketMQMessageChannelBinder extends .findFirst().orElse(null)); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setSync(producerProperties.getExtension().getSync()); - + messageHandler.setHeaderMapper(createHeaderMapper(producerProperties)); if (errorChannel != null) { messageHandler.setSendFailureChannel(errorChannel); } @@ -212,9 +208,7 @@ public class RocketMQMessageChannelBinder extends consumerProperties.getExtension().getDelayLevelWhenNextConsume()); listenerContainer .setNameServer(rocketBinderConfigurationProperties.getNameServer()); - RocketMQHeaderMapper headerMapper=new JacksonRocketMQHeaderMapper(this.getApplicationContext() - .getBeansOfType(ObjectMapper.class).values().iterator().next()); - listenerContainer.setHeaderMapper(headerMapper); + listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties)); RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager); @@ -299,4 +293,26 @@ public class RocketMQMessageChannelBinder extends this.extendedBindingProperties = extendedBindingProperties; } + private RocketMQHeaderMapper createHeaderMapper( + final ExtendedConsumerProperties extendedConsumerProperties) { + Set trustedPackages = extendedConsumerProperties.getExtension() + .getTrustedPackages(); + return createHeaderMapper(trustedPackages); + } + + private RocketMQHeaderMapper createHeaderMapper( + final ExtendedProducerProperties producerProperties) { + return createHeaderMapper(Collections.emptyList()); + } + + private RocketMQHeaderMapper createHeaderMapper(Collection trustedPackages){ + ObjectMapper objectMapper=this.getApplicationContext() + .getBeansOfType(ObjectMapper.class).values().iterator().next(); + JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(objectMapper); + if (!StringUtils.isEmpty(trustedPackages)) { + headerMapper.addTrustedPackages(trustedPackages); + } + return headerMapper; + } + } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index 27be39a24..d0e166047 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -444,16 +444,15 @@ public class RocketMQListenerBindingContainer * @param messageExt the rocketmq message * @return the converted Spring {@link Message} */ - private Message convertToSpringMessage(MessageExt messageExt) { + @SuppressWarnings("unchecked") + private Message convertToSpringMessage(MessageExt messageExt) { // add reconsume-times header to messageExt int reconsumeTimes = messageExt.getReconsumeTimes(); messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes)); Message message=RocketMQUtil.convertToSpringMessage(messageExt); - Map afterMapperHeaders= Maps.newHashMap(); - headerMapper.toHeaders(messageExt.getProperties(),afterMapperHeaders); - return MessageBuilder.fromMessage(message).copyHeaders(afterMapperHeaders).build(); + return MessageBuilder.fromMessage(message).copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index 726064220..f778964bb 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -67,7 +67,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private final RocketMQTemplate rocketMQTemplate; - private final RocketMQHeaderMapper headerMapper; + private RocketMQHeaderMapper headerMapper; private final Boolean transactional; @@ -97,8 +97,6 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li this.instrumentationManager = instrumentationManager; this.producerProperties = producerProperties; this.partitioningInterceptor = partitioningInterceptor; - - this.headerMapper=new JacksonRocketMQHeaderMapper(rocketMQTemplate.getObjectMapper()); } @Override @@ -159,8 +157,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li throws Exception { try { //issue 737 fix - Map jsonHeaders= Maps.newHashMap(); - headerMapper.fromHeaders(message.getHeaders(),jsonHeaders); + Map jsonHeaders=headerMapper.fromHeaders(message.getHeaders()); message = org.springframework.messaging.support.MessageBuilder .fromMessage(message).copyHeaders(jsonHeaders) .build(); @@ -296,4 +293,12 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li public void setSync(boolean sync) { this.sync = sync; } + + public RocketMQHeaderMapper getHeaderMapper() { + return headerMapper; + } + + public void setHeaderMapper(RocketMQHeaderMapper headerMapper) { + this.headerMapper = headerMapper; + } } \ No newline at end of file diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 9554e580b..f8a4b39bd 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -16,6 +16,7 @@ package com.alibaba.cloud.stream.binder.rocketmq.properties; +import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; import org.apache.rocketmq.client.consumer.MQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -23,6 +24,9 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import java.util.List; +import java.util.Set; + /** * @author Timur Valiev * @author Jim @@ -65,6 +69,11 @@ public class RocketMQConsumerProperties { private Boolean enabled = true; + /** + * {@link JacksonRocketMQHeaderMapper#addTrustedPackages(String...)} + */ + private Set trustedPackages; + // ------------ For Pull Consumer ------------ private long pullTimeout = 10 * 1000; @@ -148,4 +157,12 @@ public class RocketMQConsumerProperties { public boolean shouldRequeue() { return delayLevelWhenNextConsume != -1; } + + public Set getTrustedPackages() { + return trustedPackages; + } + + public void setTrustedPackages(Set trustedPackages) { + this.trustedPackages = trustedPackages; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java index a03276d43..c3d1d83c8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java @@ -52,7 +52,8 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ } @Override - public void fromHeaders(Map headers, Map target) { + public Map fromHeaders(MessageHeaders headers) { + final Map target = Maps.newHashMap(); final Map jsonHeaders = Maps.newHashMap(); headers.forEach((key, value) -> { if (matches(key)) { @@ -78,10 +79,12 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ log.error( "Could not add json types header",e); } } + return target; } @Override - public void toHeaders(Map source, Map target) { + public MessageHeaders toHeaders(Map source) { + final Map target = Maps.newHashMap(); final Map jsonTypes = decodeJsonTypes(source); source.forEach((key,value) -> { if (!(key.equals(JSON_TYPES))) { @@ -116,6 +119,17 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ } } }); + return new MessageHeaders(target); + } + + /** + * @param packagesToTrust the packages to trust. + * @see #addTrustedPackages(Collection) + */ + public void addTrustedPackages(String... packagesToTrust) { + if(Objects.nonNull(packagesToTrust)){ + addTrustedPackages(Arrays.asList(packagesToTrust)); + } } /** @@ -126,7 +140,7 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ * application with value of type {@link NonTrustedHeaderType}. * @param packagesToTrust the packages to trust. */ - public void addTrustedPackages(String... packagesToTrust) { + public void addTrustedPackages(Collection packagesToTrust) { if (packagesToTrust != null) { for (String whiteList : packagesToTrust) { if ("*".equals(whiteList)) { @@ -140,7 +154,6 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ } } - public Set getTrustedPackages() { return this.trustedPackages; } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java index 0f2f09dce..d34af0eac 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java @@ -12,8 +12,16 @@ import java.util.Map; * @since 2.1.1 */ public interface RocketMQHeaderMapper { - - void fromHeaders(Map headers, Map target); - - void toHeaders(Map source, Map target); + /** + * Map from the given {@link MessageHeaders} to the specified target message. + * @param headers the abstracted MessageHeaders. + * @return the native target message. + */ + Map fromHeaders(MessageHeaders headers); + /** + * Map from the given target message to abstracted {@link MessageHeaders}. + * @param source the native target message. + * @return the target headers. + */ + MessageHeaders toHeaders(Map source); } From 0a1f7120945f04e82c556b19996c3b428e14a99b Mon Sep 17 00:00:00 2001 From: caotc <250622148@qq.com> Date: Mon, 26 Aug 2019 15:28:20 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E5=B0=86rocketmq=E7=9A=84header=E8=BD=AC?= =?UTF-8?q?=E6=8D=A2=E4=B8=BAspring=E7=9A=84header=E6=97=B6=E4=B9=9F?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E8=BF=87=E6=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: caotc <250622148@qq.com> --- .../binder/rocketmq/support/JacksonRocketMQHeaderMapper.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java index c3d1d83c8..db5d446ba 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java @@ -87,7 +87,7 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ final Map target = Maps.newHashMap(); final Map jsonTypes = decodeJsonTypes(source); source.forEach((key,value) -> { - if (!(key.equals(JSON_TYPES))) { + if (matches(key) && !(key.equals(JSON_TYPES))) { if (jsonTypes != null && jsonTypes.containsKey(key)) { Class type = Object.class; String requestedType = jsonTypes.get(key); @@ -113,8 +113,7 @@ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ }else { target.put(key, new NonTrustedHeaderType(value, requestedType)); } - } - else { + }else { target.put(key, value); } } From 2cacc97d945598b78c8033b9e61957fb8f044640 Mon Sep 17 00:00:00 2001 From: caotc <250622148@qq.com> Date: Tue, 3 Sep 2019 13:38:19 +0800 Subject: [PATCH 4/8] format code Signed-off-by: caotc <250622148@qq.com> --- .../support/AbstractRocketMQHeaderMapper.java | 79 ++-- .../support/JacksonRocketMQHeaderMapper.java | 437 +++++++++--------- .../support/RocketMQHeaderMapper.java | 41 +- 3 files changed, 282 insertions(+), 275 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java index e91b6e2f7..be14bb389 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java @@ -1,47 +1,48 @@ package com.alibaba.cloud.stream.binder.rocketmq.support; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + import org.apache.rocketmq.common.message.MessageConst; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - /** -* Base for RocketMQ header mappers. -* -* @author caotc -* @date 2019-08-22 -* @since 2.1.1 -*/ -public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper{ - private static final Charset DEFAULT_CHARSET=StandardCharsets.UTF_8; - - private Charset charset; - - public AbstractRocketMQHeaderMapper() { - this(DEFAULT_CHARSET); - } - - public AbstractRocketMQHeaderMapper(Charset charset) { - Assert.notNull(charset, "'charset' cannot be null"); - this.charset = charset; - } - - protected boolean matches(String headerName) { - return !MessageConst.STRING_HASH_SET.contains(headerName) && !MessageHeaders.ID.equals(headerName) - && !MessageHeaders.TIMESTAMP.equals(headerName) && !MessageHeaders.CONTENT_TYPE.equals(headerName) - && !MessageHeaders.REPLY_CHANNEL.equals(headerName) && !MessageHeaders.ERROR_CHANNEL.equals(headerName); - } - - public Charset getCharset() { - return charset; - } - - public void setCharset(Charset charset) { - Assert.notNull(charset, "'charset' cannot be null"); - this.charset = charset; - } + * Base for RocketMQ header mappers. + * + * @author caotc + * @date 2019-08-22 + * @since 2.1.1 + */ +public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper { + private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; + + private Charset charset; + + public AbstractRocketMQHeaderMapper() { + this(DEFAULT_CHARSET); + } + + public AbstractRocketMQHeaderMapper(Charset charset) { + Assert.notNull(charset, "'charset' cannot be null"); + this.charset = charset; + } + + protected boolean matches(String headerName) { + return !MessageConst.STRING_HASH_SET.contains(headerName) + && !MessageHeaders.ID.equals(headerName) + && !MessageHeaders.TIMESTAMP.equals(headerName) + && !MessageHeaders.CONTENT_TYPE.equals(headerName) + && !MessageHeaders.REPLY_CHANNEL.equals(headerName) + && !MessageHeaders.ERROR_CHANNEL.equals(headerName); + } + + public Charset getCharset() { + return charset; + } + + public void setCharset(Charset charset) { + Assert.notNull(charset, "'charset' cannot be null"); + this.charset = charset; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java index db5d446ba..91c5abdf8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java @@ -1,258 +1,263 @@ package com.alibaba.cloud.stream.binder.rocketmq.support; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.*; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.lang.Nullable; import org.springframework.messaging.MessageHeaders; import org.springframework.util.ClassUtils; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.*; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; /** - * jackson header mapper for RocketMQ. - * Header types are added to a special header {@link #JSON_TYPES}. + * jackson header mapper for RocketMQ. Header types are added to a special header + * {@link #JSON_TYPES}. * * @author caotc * @date 2019-08-22 * @since 2.1.1 */ -public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ - private final static Logger log = LoggerFactory - .getLogger(JacksonRocketMQHeaderMapper.class); - - private static final List DEFAULT_TRUSTED_PACKAGES = - Arrays.asList( - "java.lang", - "java.net", - "java.util", - "org.springframework.util" - ); +public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper { + private final static Logger log = LoggerFactory + .getLogger(JacksonRocketMQHeaderMapper.class); - /** - * Header name for java types of other headers. - */ - public static final String JSON_TYPES = "spring_json_header_types"; + private static final List DEFAULT_TRUSTED_PACKAGES = Arrays + .asList("java.lang", "java.net", "java.util", "org.springframework.util"); - private final ObjectMapper objectMapper; - private final Set trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES); + /** + * Header name for java types of other headers. + */ + public static final String JSON_TYPES = "spring_json_header_types"; - public JacksonRocketMQHeaderMapper(ObjectMapper objectMapper) { - this.objectMapper = objectMapper; - } + private final ObjectMapper objectMapper; + private final Set trustedPackages = new LinkedHashSet<>( + DEFAULT_TRUSTED_PACKAGES); - public JacksonRocketMQHeaderMapper(Charset charset, ObjectMapper objectMapper) { - super(charset); - this.objectMapper = objectMapper; - } + public JacksonRocketMQHeaderMapper(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } - @Override - public Map fromHeaders(MessageHeaders headers) { - final Map target = Maps.newHashMap(); - final Map jsonHeaders = Maps.newHashMap(); - headers.forEach((key, value) -> { - if (matches(key)) { - if (value instanceof String) { - target.put(key, (String) value); - }else { - try { - String className = value.getClass().getName(); - target.put(key, objectMapper.writeValueAsString(value)); - jsonHeaders.put(key, className); - } - catch (Exception e) { - log.debug("Could not map " + key + " with type " + value.getClass().getName(),e); - } - } - } - }); - if (jsonHeaders.size() > 0) { - try { - target.put(JSON_TYPES, objectMapper.writeValueAsString(jsonHeaders)); - } - catch (IllegalStateException | JsonProcessingException e) { - log.error( "Could not add json types header",e); - } - } - return target; - } + public JacksonRocketMQHeaderMapper(Charset charset, ObjectMapper objectMapper) { + super(charset); + this.objectMapper = objectMapper; + } - @Override - public MessageHeaders toHeaders(Map source) { - final Map target = Maps.newHashMap(); - final Map jsonTypes = decodeJsonTypes(source); - source.forEach((key,value) -> { - if (matches(key) && !(key.equals(JSON_TYPES))) { - if (jsonTypes != null && jsonTypes.containsKey(key)) { - Class type = Object.class; - String requestedType = jsonTypes.get(key); - boolean trusted = trusted(requestedType); - if (trusted) { - try { - type = ClassUtils.forName(requestedType, null); - }catch (Exception e) { - log.error( "Could not load class for header: " + key,e); - } - } + @Override + public Map fromHeaders(MessageHeaders headers) { + final Map target = Maps.newHashMap(); + final Map jsonHeaders = Maps.newHashMap(); + headers.forEach((key, value) -> { + if (matches(key)) { + if (value instanceof String) { + target.put(key, (String) value); + } + else { + try { + String className = value.getClass().getName(); + target.put(key, objectMapper.writeValueAsString(value)); + jsonHeaders.put(key, className); + } + catch (Exception e) { + log.debug("Could not map " + key + " with type " + + value.getClass().getName(), e); + } + } + } + }); + if (jsonHeaders.size() > 0) { + try { + target.put(JSON_TYPES, objectMapper.writeValueAsString(jsonHeaders)); + } + catch (IllegalStateException | JsonProcessingException e) { + log.error("Could not add json types header", e); + } + } + return target; + } - if (trusted) { - try { - Object val = decodeValue(value, type); - target.put(key, val); - } - catch (IOException e) { - log.error("Could not decode json type: " + value + " for key: " - + key,e); - target.put(key, value); - } - }else { - target.put(key, new NonTrustedHeaderType(value, requestedType)); - } - }else { - target.put(key, value); - } - } - }); - return new MessageHeaders(target); - } + @Override + public MessageHeaders toHeaders(Map source) { + final Map target = Maps.newHashMap(); + final Map jsonTypes = decodeJsonTypes(source); + source.forEach((key, value) -> { + if (matches(key) && !(key.equals(JSON_TYPES))) { + if (jsonTypes != null && jsonTypes.containsKey(key)) { + Class type = Object.class; + String requestedType = jsonTypes.get(key); + boolean trusted = trusted(requestedType); + if (trusted) { + try { + type = ClassUtils.forName(requestedType, null); + } + catch (Exception e) { + log.error("Could not load class for header: " + key, e); + } + } - /** - * @param packagesToTrust the packages to trust. - * @see #addTrustedPackages(Collection) - */ - public void addTrustedPackages(String... packagesToTrust) { - if(Objects.nonNull(packagesToTrust)){ - addTrustedPackages(Arrays.asList(packagesToTrust)); - } - } + if (trusted) { + try { + Object val = decodeValue(value, type); + target.put(key, val); + } + catch (IOException e) { + log.error("Could not decode json type: " + value + + " for key: " + key, e); + target.put(key, value); + } + } + else { + target.put(key, new NonTrustedHeaderType(value, requestedType)); + } + } + else { + target.put(key, value); + } + } + }); + return new MessageHeaders(target); + } - /** - * Add packages to the trusted packages list (default {@code java.util, java.lang}) used - * when constructing objects from JSON. - * If any of the supplied packages is {@code "*"}, all packages are trusted. - * If a class for a non-trusted package is encountered, the header is returned to the - * application with value of type {@link NonTrustedHeaderType}. - * @param packagesToTrust the packages to trust. - */ - public void addTrustedPackages(Collection packagesToTrust) { - if (packagesToTrust != null) { - for (String whiteList : packagesToTrust) { - if ("*".equals(whiteList)) { - this.trustedPackages.clear(); - break; - } - else { - this.trustedPackages.add(whiteList); - } - } - } - } + /** + * @param packagesToTrust the packages to trust. + * @see #addTrustedPackages(Collection) + */ + public void addTrustedPackages(String... packagesToTrust) { + if (Objects.nonNull(packagesToTrust)) { + addTrustedPackages(Arrays.asList(packagesToTrust)); + } + } - public Set getTrustedPackages() { - return this.trustedPackages; - } + /** + * Add packages to the trusted packages list (default {@code java.util, java.lang}) + * used when constructing objects from JSON. If any of the supplied packages is + * {@code "*"}, all packages are trusted. If a class for a non-trusted package is + * encountered, the header is returned to the application with value of type + * {@link NonTrustedHeaderType}. + * @param packagesToTrust the packages to trust. + */ + public void addTrustedPackages(Collection packagesToTrust) { + if (packagesToTrust != null) { + for (String whiteList : packagesToTrust) { + if ("*".equals(whiteList)) { + this.trustedPackages.clear(); + break; + } + else { + this.trustedPackages.add(whiteList); + } + } + } + } - public ObjectMapper getObjectMapper() { - return objectMapper; - } + public Set getTrustedPackages() { + return this.trustedPackages; + } - private Object decodeValue(String jsonString, Class type) throws IOException, LinkageError { - Object value = objectMapper.readValue(jsonString, type); - if (type.equals(NonTrustedHeaderType.class)) { - // Upstream NTHT propagated; may be trusted here... - NonTrustedHeaderType nth = (NonTrustedHeaderType) value; - if (trusted(nth.getUntrustedType())) { - try { - value = objectMapper.readValue(nth.getHeaderValue(), - ClassUtils.forName(nth.getUntrustedType(), null)); - } - catch (Exception e) { - log.error("Could not decode header: " + nth,e); - } - } - } - return value; - } + public ObjectMapper getObjectMapper() { + return objectMapper; + } - @Nullable - private Map decodeJsonTypes(Map source) { - if(source.containsKey(JSON_TYPES)){ - String value=source.get(JSON_TYPES); - try { - return objectMapper.readValue(value,new TypeReference>(){}); - } - catch (IOException e) { - log.error("Could not decode json types: " + value,e); - } - } - return null; - } + private Object decodeValue(String jsonString, Class type) + throws IOException, LinkageError { + Object value = objectMapper.readValue(jsonString, type); + if (type.equals(NonTrustedHeaderType.class)) { + // Upstream NTHT propagated; may be trusted here... + NonTrustedHeaderType nth = (NonTrustedHeaderType) value; + if (trusted(nth.getUntrustedType())) { + try { + value = objectMapper.readValue(nth.getHeaderValue(), + ClassUtils.forName(nth.getUntrustedType(), null)); + } + catch (Exception e) { + log.error("Could not decode header: " + nth, e); + } + } + } + return value; + } - protected boolean trusted(String requestedType) { - if (requestedType.equals(NonTrustedHeaderType.class.getName())) { - return true; - } - if (!this.trustedPackages.isEmpty()) { - int lastDot = requestedType.lastIndexOf('.'); - if (lastDot < 0) { - return false; - } - String packageName = requestedType.substring(0, lastDot); - for (String trustedPackage : this.trustedPackages) { - if (packageName.equals(trustedPackage) || packageName.startsWith(trustedPackage + ".")) { - return true; - } - } - return false; - } - return true; - } + @Nullable + private Map decodeJsonTypes(Map source) { + if (source.containsKey(JSON_TYPES)) { + String value = source.get(JSON_TYPES); + try { + return objectMapper.readValue(value, + new TypeReference>() { + }); + } + catch (IOException e) { + log.error("Could not decode json types: " + value, e); + } + } + return null; + } - /** - * Represents a header that could not be decoded due to an untrusted type. - */ - public static class NonTrustedHeaderType { + protected boolean trusted(String requestedType) { + if (requestedType.equals(NonTrustedHeaderType.class.getName())) { + return true; + } + if (!this.trustedPackages.isEmpty()) { + int lastDot = requestedType.lastIndexOf('.'); + if (lastDot < 0) { + return false; + } + String packageName = requestedType.substring(0, lastDot); + for (String trustedPackage : this.trustedPackages) { + if (packageName.equals(trustedPackage) + || packageName.startsWith(trustedPackage + ".")) { + return true; + } + } + return false; + } + return true; + } - private String headerValue; + /** + * Represents a header that could not be decoded due to an untrusted type. + */ + public static class NonTrustedHeaderType { - private String untrustedType; + private String headerValue; - public NonTrustedHeaderType() { - super(); - } + private String untrustedType; - NonTrustedHeaderType(String headerValue, String untrustedType) { - this.headerValue = headerValue; - this.untrustedType = untrustedType; - } + public NonTrustedHeaderType() { + super(); + } + NonTrustedHeaderType(String headerValue, String untrustedType) { + this.headerValue = headerValue; + this.untrustedType = untrustedType; + } - public void setHeaderValue(String headerValue) { - this.headerValue = headerValue; - } + public void setHeaderValue(String headerValue) { + this.headerValue = headerValue; + } - public String getHeaderValue() { - return this.headerValue; - } + public String getHeaderValue() { + return this.headerValue; + } - public void setUntrustedType(String untrustedType) { - this.untrustedType = untrustedType; - } + public void setUntrustedType(String untrustedType) { + this.untrustedType = untrustedType; + } - public String getUntrustedType() { - return this.untrustedType; - } + public String getUntrustedType() { + return this.untrustedType; + } - @Override - public String toString() { - return "NonTrustedHeaderType [headerValue=" + headerValue - + ", untrustedType=" + this.untrustedType + "]"; - } + @Override + public String toString() { + return "NonTrustedHeaderType [headerValue=" + headerValue + ", untrustedType=" + + this.untrustedType + "]"; + } - } + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java index d34af0eac..ee5994442 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java @@ -1,27 +1,28 @@ package com.alibaba.cloud.stream.binder.rocketmq.support; -import org.springframework.messaging.MessageHeaders; - import java.util.Map; +import org.springframework.messaging.MessageHeaders; + /** -* header value mapper for RocketMQ -* -* @author caotc -* @date 2019-08-22 -* @since 2.1.1 -*/ + * header value mapper for RocketMQ + * + * @author caotc + * @date 2019-08-22 + * @since 2.1.1 + */ public interface RocketMQHeaderMapper { - /** - * Map from the given {@link MessageHeaders} to the specified target message. - * @param headers the abstracted MessageHeaders. - * @return the native target message. - */ - Map fromHeaders(MessageHeaders headers); - /** - * Map from the given target message to abstracted {@link MessageHeaders}. - * @param source the native target message. - * @return the target headers. - */ - MessageHeaders toHeaders(Map source); + /** + * Map from the given {@link MessageHeaders} to the specified target message. + * @param headers the abstracted MessageHeaders. + * @return the native target message. + */ + Map fromHeaders(MessageHeaders headers); + + /** + * Map from the given target message to abstracted {@link MessageHeaders}. + * @param source the native target message. + * @return the target headers. + */ + MessageHeaders toHeaders(Map source); } From 8ed8cd2dba32bb0efc81472f8990ee013c958372 Mon Sep 17 00:00:00 2001 From: caotc <250622148@qq.com> Date: Tue, 3 Sep 2019 15:40:32 +0800 Subject: [PATCH 5/8] format javadoc Signed-off-by: caotc <250622148@qq.com> --- .../binder/rocketmq/support/AbstractRocketMQHeaderMapper.java | 3 +-- .../binder/rocketmq/support/JacksonRocketMQHeaderMapper.java | 3 +-- .../stream/binder/rocketmq/support/RocketMQHeaderMapper.java | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java index be14bb389..79f47eb8b 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java @@ -11,8 +11,7 @@ import org.springframework.util.Assert; * Base for RocketMQ header mappers. * * @author caotc - * @date 2019-08-22 - * @since 2.1.1 + * @since 2.1.1.RELEASE */ public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper { private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java index 91c5abdf8..af56b5349 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java @@ -20,8 +20,7 @@ import com.google.common.collect.Maps; * {@link #JSON_TYPES}. * * @author caotc - * @date 2019-08-22 - * @since 2.1.1 + * @since 2.1.1.RELEASE */ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper { private final static Logger log = LoggerFactory diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java index ee5994442..acc5ae8d4 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java @@ -8,8 +8,7 @@ import org.springframework.messaging.MessageHeaders; * header value mapper for RocketMQ * * @author caotc - * @date 2019-08-22 - * @since 2.1.1 + * @since 2.1.1.RELEASE */ public interface RocketMQHeaderMapper { /** From d02a35b173165354f9c2af307f262aef9ce7d4c5 Mon Sep 17 00:00:00 2001 From: caotc <250622148@qq.com> Date: Tue, 3 Sep 2019 15:49:46 +0800 Subject: [PATCH 6/8] format code Signed-off-by: caotc <250622148@qq.com> --- .../RocketMQMessageChannelBinder.java | 54 +++++++++---------- .../RocketMQListenerBindingContainer.java | 7 +-- .../integration/RocketMQMessageHandler.java | 19 ++++--- 3 files changed, 39 insertions(+), 41 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 915160507..579153e15 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -82,11 +82,10 @@ public class RocketMQMessageChannelBinder extends this.instrumentationManager = instrumentationManager; } - @Override + @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, - MessageChannel channel, - MessageChannel errorChannel) throws Exception { + MessageChannel channel, MessageChannel errorChannel) throws Exception { if (producerProperties.getExtension().getEnabled()) { // if producerGroup is empty, using destination @@ -155,8 +154,6 @@ public class RocketMQMessageChannelBinder extends } } - - RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), @@ -167,7 +164,7 @@ public class RocketMQMessageChannelBinder extends .findFirst().orElse(null)); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setSync(producerProperties.getExtension().getSync()); - messageHandler.setHeaderMapper(createHeaderMapper(producerProperties)); + messageHandler.setHeaderMapper(createHeaderMapper(producerProperties)); if (errorChannel != null) { messageHandler.setSendFailureChannel(errorChannel); } @@ -208,7 +205,7 @@ public class RocketMQMessageChannelBinder extends consumerProperties.getExtension().getDelayLevelWhenNextConsume()); listenerContainer .setNameServer(rocketBinderConfigurationProperties.getNameServer()); - listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties)); + listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties)); RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter( listenerContainer, consumerProperties, instrumentationManager); @@ -293,26 +290,27 @@ public class RocketMQMessageChannelBinder extends this.extendedBindingProperties = extendedBindingProperties; } - private RocketMQHeaderMapper createHeaderMapper( - final ExtendedConsumerProperties extendedConsumerProperties) { - Set trustedPackages = extendedConsumerProperties.getExtension() - .getTrustedPackages(); - return createHeaderMapper(trustedPackages); - } - - private RocketMQHeaderMapper createHeaderMapper( - final ExtendedProducerProperties producerProperties) { - return createHeaderMapper(Collections.emptyList()); - } - - private RocketMQHeaderMapper createHeaderMapper(Collection trustedPackages){ - ObjectMapper objectMapper=this.getApplicationContext() - .getBeansOfType(ObjectMapper.class).values().iterator().next(); - JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper(objectMapper); - if (!StringUtils.isEmpty(trustedPackages)) { - headerMapper.addTrustedPackages(trustedPackages); - } - return headerMapper; - } + private RocketMQHeaderMapper createHeaderMapper( + final ExtendedConsumerProperties extendedConsumerProperties) { + Set trustedPackages = extendedConsumerProperties.getExtension() + .getTrustedPackages(); + return createHeaderMapper(trustedPackages); + } + + private RocketMQHeaderMapper createHeaderMapper( + final ExtendedProducerProperties producerProperties) { + return createHeaderMapper(Collections.emptyList()); + } + + private RocketMQHeaderMapper createHeaderMapper(Collection trustedPackages) { + ObjectMapper objectMapper = this.getApplicationContext() + .getBeansOfType(ObjectMapper.class).values().iterator().next(); + JacksonRocketMQHeaderMapper headerMapper = new JacksonRocketMQHeaderMapper( + objectMapper); + if (!StringUtils.isEmpty(trustedPackages)) { + headerMapper.addTrustedPackages(trustedPackages); + } + return headerMapper; + } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java index d0e166047..6a9e4cd21 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/consuming/RocketMQListenerBindingContainer.java @@ -445,14 +445,15 @@ public class RocketMQListenerBindingContainer * @return the converted Spring {@link Message} */ @SuppressWarnings("unchecked") - private Message convertToSpringMessage(MessageExt messageExt) { + private Message convertToSpringMessage(MessageExt messageExt) { // add reconsume-times header to messageExt int reconsumeTimes = messageExt.getReconsumeTimes(); messageExt.putUserProperty(ROCKETMQ_RECONSUME_TIMES, String.valueOf(reconsumeTimes)); - Message message=RocketMQUtil.convertToSpringMessage(messageExt); - return MessageBuilder.fromMessage(message).copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build(); + Message message = RocketMQUtil.convertToSpringMessage(messageExt); + return MessageBuilder.fromMessage(message) + .copyHeaders(headerMapper.toHeaders(messageExt.getProperties())).build(); } } diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java index f778964bb..1f2f7be7c 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/RocketMQMessageHandler.java @@ -67,7 +67,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li private final RocketMQTemplate rocketMQTemplate; - private RocketMQHeaderMapper headerMapper; + private RocketMQHeaderMapper headerMapper; private final Boolean transactional; @@ -156,12 +156,11 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li protected void handleMessageInternal(org.springframework.messaging.Message message) throws Exception { try { - //issue 737 fix - Map jsonHeaders=headerMapper.fromHeaders(message.getHeaders()); - message = org.springframework.messaging.support.MessageBuilder - .fromMessage(message).copyHeaders(jsonHeaders) - .build(); - + // issue 737 fix + Map jsonHeaders = headerMapper + .fromHeaders(message.getHeaders()); + message = org.springframework.messaging.support.MessageBuilder + .fromMessage(message).copyHeaders(jsonHeaders).build(); final StringBuilder topicWithTags = new StringBuilder(destination); String tags = Optional @@ -210,8 +209,8 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li log.debug("sync send to topic " + topicWithTags + " " + sendRes); } else { - Message finalMessage = message; - SendCallback sendCallback = new SendCallback() { + Message finalMessage = message; + SendCallback sendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.debug("async send to topic " + topicWithTags + " " @@ -226,7 +225,7 @@ public class RocketMQMessageHandler extends AbstractMessageHandler implements Li getSendFailureChannel().send( RocketMQMessageHandler.this.errorMessageStrategy .buildErrorMessage(new MessagingException( - finalMessage, e), null)); + finalMessage, e), null)); } } }; From 7f0766f89edecabeb20da469477dbe2aef885891 Mon Sep 17 00:00:00 2001 From: caotc <250622148@qq.com> Date: Tue, 3 Sep 2019 15:52:12 +0800 Subject: [PATCH 7/8] add license Signed-off-by: caotc <250622148@qq.com> --- .../support/AbstractRocketMQHeaderMapper.java | 16 ++++++++++++++++ .../support/JacksonRocketMQHeaderMapper.java | 16 ++++++++++++++++ .../rocketmq/support/RocketMQHeaderMapper.java | 16 ++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java index 79f47eb8b..bd9e997b6 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/AbstractRocketMQHeaderMapper.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.alibaba.cloud.stream.binder.rocketmq.support; import java.nio.charset.Charset; diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java index af56b5349..41605858f 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/JacksonRocketMQHeaderMapper.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.alibaba.cloud.stream.binder.rocketmq.support; import java.io.IOException; diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java index acc5ae8d4..e291e8c7e 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQHeaderMapper.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.alibaba.cloud.stream.binder.rocketmq.support; import java.util.Map; From dc777c94b821a1d9b0849516f8d7d7ab61d28366 Mon Sep 17 00:00:00 2001 From: caotc <250622148@qq.com> Date: Tue, 3 Sep 2019 16:01:29 +0800 Subject: [PATCH 8/8] format import Signed-off-by: caotc <250622148@qq.com> --- .../RocketMQMessageChannelBinder.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java index 579153e15..c6ee54ca8 100644 --- a/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java +++ b/spring-cloud-stream-binder-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageChannelBinder.java @@ -16,20 +16,8 @@ package com.alibaba.cloud.stream.binder.rocketmq; -import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; -import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; -import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; -import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource; -import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; -import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; -import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; -import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; -import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; -import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; -import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.*; + import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -52,7 +40,20 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.util.StringUtils; -import java.util.*; +import com.alibaba.cloud.stream.binder.rocketmq.consuming.RocketMQListenerBindingContainer; +import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter; +import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageHandler; +import com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQMessageSource; +import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties; +import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQProducerProperties; +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.RocketMQTopicProvisioner; +import com.alibaba.cloud.stream.binder.rocketmq.provisioning.selector.PartitionMessageQueueSelector; +import com.alibaba.cloud.stream.binder.rocketmq.support.JacksonRocketMQHeaderMapper; +import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQHeaderMapper; +import com.fasterxml.jackson.databind.ObjectMapper; /** * @author Jim