format code

Signed-off-by: caotc <250622148@qq.com>
pull/880/head
caotc 6 years ago
parent 0a1f712094
commit 2cacc97d94

@ -1,47 +1,48 @@
package com.alibaba.cloud.stream.binder.rocketmq.support; package com.alibaba.cloud.stream.binder.rocketmq.support;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
/** /**
* Base for RocketMQ header mappers. * Base for RocketMQ header mappers.
* *
* @author caotc * @author caotc
* @date 2019-08-22 * @date 2019-08-22
* @since 2.1.1 * @since 2.1.1
*/ */
public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper{ public abstract class AbstractRocketMQHeaderMapper implements RocketMQHeaderMapper {
private static final Charset DEFAULT_CHARSET=StandardCharsets.UTF_8; private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private Charset charset; private Charset charset;
public AbstractRocketMQHeaderMapper() { public AbstractRocketMQHeaderMapper() {
this(DEFAULT_CHARSET); this(DEFAULT_CHARSET);
} }
public AbstractRocketMQHeaderMapper(Charset charset) { public AbstractRocketMQHeaderMapper(Charset charset) {
Assert.notNull(charset, "'charset' cannot be null"); Assert.notNull(charset, "'charset' cannot be null");
this.charset = charset; this.charset = charset;
} }
protected boolean matches(String headerName) { protected boolean matches(String headerName) {
return !MessageConst.STRING_HASH_SET.contains(headerName) && !MessageHeaders.ID.equals(headerName) return !MessageConst.STRING_HASH_SET.contains(headerName)
&& !MessageHeaders.TIMESTAMP.equals(headerName) && !MessageHeaders.CONTENT_TYPE.equals(headerName) && !MessageHeaders.ID.equals(headerName)
&& !MessageHeaders.REPLY_CHANNEL.equals(headerName) && !MessageHeaders.ERROR_CHANNEL.equals(headerName); && !MessageHeaders.TIMESTAMP.equals(headerName)
} && !MessageHeaders.CONTENT_TYPE.equals(headerName)
&& !MessageHeaders.REPLY_CHANNEL.equals(headerName)
public Charset getCharset() { && !MessageHeaders.ERROR_CHANNEL.equals(headerName);
return charset; }
}
public Charset getCharset() {
public void setCharset(Charset charset) { return charset;
Assert.notNull(charset, "'charset' cannot be null"); }
this.charset = charset;
} public void setCharset(Charset charset) {
Assert.notNull(charset, "'charset' cannot be null");
this.charset = charset;
}
} }

@ -1,258 +1,263 @@
package com.alibaba.cloud.stream.binder.rocketmq.support; package com.alibaba.cloud.stream.binder.rocketmq.support;
import com.fasterxml.jackson.core.JsonProcessingException; import java.io.IOException;
import com.fasterxml.jackson.core.type.TypeReference; import java.nio.charset.Charset;
import com.fasterxml.jackson.databind.ObjectMapper; import java.util.*;
import com.google.common.collect.Maps;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import org.springframework.util.ClassUtils; import org.springframework.util.ClassUtils;
import java.io.IOException; import com.fasterxml.jackson.core.JsonProcessingException;
import java.nio.charset.Charset; import com.fasterxml.jackson.core.type.TypeReference;
import java.util.*; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
/** /**
* jackson header mapper for RocketMQ. * jackson header mapper for RocketMQ. Header types are added to a special header
* Header types are added to a special header {@link #JSON_TYPES}. * {@link #JSON_TYPES}.
* *
* @author caotc * @author caotc
* @date 2019-08-22 * @date 2019-08-22
* @since 2.1.1 * @since 2.1.1
*/ */
public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper{ public class JacksonRocketMQHeaderMapper extends AbstractRocketMQHeaderMapper {
private final static Logger log = LoggerFactory private final static Logger log = LoggerFactory
.getLogger(JacksonRocketMQHeaderMapper.class); .getLogger(JacksonRocketMQHeaderMapper.class);
private static final List<String> DEFAULT_TRUSTED_PACKAGES =
Arrays.asList(
"java.lang",
"java.net",
"java.util",
"org.springframework.util"
);
/** private static final List<String> DEFAULT_TRUSTED_PACKAGES = Arrays
* Header name for java types of other headers. .asList("java.lang", "java.net", "java.util", "org.springframework.util");
*/
public static final String JSON_TYPES = "spring_json_header_types";
private final ObjectMapper objectMapper; /**
private final Set<String> trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES); * Header name for java types of other headers.
*/
public static final String JSON_TYPES = "spring_json_header_types";
public JacksonRocketMQHeaderMapper(ObjectMapper objectMapper) { private final ObjectMapper objectMapper;
this.objectMapper = objectMapper; private final Set<String> trustedPackages = new LinkedHashSet<>(
} DEFAULT_TRUSTED_PACKAGES);
public JacksonRocketMQHeaderMapper(Charset charset, ObjectMapper objectMapper) { public JacksonRocketMQHeaderMapper(ObjectMapper objectMapper) {
super(charset); this.objectMapper = objectMapper;
this.objectMapper = objectMapper; }
}
@Override public JacksonRocketMQHeaderMapper(Charset charset, ObjectMapper objectMapper) {
public Map<String,String> fromHeaders(MessageHeaders headers) { super(charset);
final Map<String, String> target = Maps.newHashMap(); this.objectMapper = objectMapper;
final Map<String, String> jsonHeaders = Maps.newHashMap(); }
headers.forEach((key, value) -> {
if (matches(key)) {
if (value instanceof String) {
target.put(key, (String) value);
}else {
try {
String className = value.getClass().getName();
target.put(key, objectMapper.writeValueAsString(value));
jsonHeaders.put(key, className);
}
catch (Exception e) {
log.debug("Could not map " + key + " with type " + value.getClass().getName(),e);
}
}
}
});
if (jsonHeaders.size() > 0) {
try {
target.put(JSON_TYPES, objectMapper.writeValueAsString(jsonHeaders));
}
catch (IllegalStateException | JsonProcessingException e) {
log.error( "Could not add json types header",e);
}
}
return target;
}
@Override @Override
public MessageHeaders toHeaders(Map<String,String> source) { public Map<String, String> fromHeaders(MessageHeaders headers) {
final Map<String, Object> target = Maps.newHashMap(); final Map<String, String> target = Maps.newHashMap();
final Map<String, String> jsonTypes = decodeJsonTypes(source); final Map<String, String> jsonHeaders = Maps.newHashMap();
source.forEach((key,value) -> { headers.forEach((key, value) -> {
if (matches(key) && !(key.equals(JSON_TYPES))) { if (matches(key)) {
if (jsonTypes != null && jsonTypes.containsKey(key)) { if (value instanceof String) {
Class<?> type = Object.class; target.put(key, (String) value);
String requestedType = jsonTypes.get(key); }
boolean trusted = trusted(requestedType); else {
if (trusted) { try {
try { String className = value.getClass().getName();
type = ClassUtils.forName(requestedType, null); target.put(key, objectMapper.writeValueAsString(value));
}catch (Exception e) { jsonHeaders.put(key, className);
log.error( "Could not load class for header: " + key,e); }
} catch (Exception e) {
} log.debug("Could not map " + key + " with type "
+ value.getClass().getName(), e);
}
}
}
});
if (jsonHeaders.size() > 0) {
try {
target.put(JSON_TYPES, objectMapper.writeValueAsString(jsonHeaders));
}
catch (IllegalStateException | JsonProcessingException e) {
log.error("Could not add json types header", e);
}
}
return target;
}
if (trusted) { @Override
try { public MessageHeaders toHeaders(Map<String, String> source) {
Object val = decodeValue(value, type); final Map<String, Object> target = Maps.newHashMap();
target.put(key, val); final Map<String, String> jsonTypes = decodeJsonTypes(source);
} source.forEach((key, value) -> {
catch (IOException e) { if (matches(key) && !(key.equals(JSON_TYPES))) {
log.error("Could not decode json type: " + value + " for key: " if (jsonTypes != null && jsonTypes.containsKey(key)) {
+ key,e); Class<?> type = Object.class;
target.put(key, value); String requestedType = jsonTypes.get(key);
} boolean trusted = trusted(requestedType);
}else { if (trusted) {
target.put(key, new NonTrustedHeaderType(value, requestedType)); try {
} type = ClassUtils.forName(requestedType, null);
}else { }
target.put(key, value); catch (Exception e) {
} log.error("Could not load class for header: " + key, e);
} }
}); }
return new MessageHeaders(target);
}
/** if (trusted) {
* @param packagesToTrust the packages to trust. try {
* @see #addTrustedPackages(Collection) Object val = decodeValue(value, type);
*/ target.put(key, val);
public void addTrustedPackages(String... packagesToTrust) { }
if(Objects.nonNull(packagesToTrust)){ catch (IOException e) {
addTrustedPackages(Arrays.asList(packagesToTrust)); log.error("Could not decode json type: " + value
} + " for key: " + key, e);
} target.put(key, value);
}
}
else {
target.put(key, new NonTrustedHeaderType(value, requestedType));
}
}
else {
target.put(key, value);
}
}
});
return new MessageHeaders(target);
}
/** /**
* Add packages to the trusted packages list (default {@code java.util, java.lang}) used * @param packagesToTrust the packages to trust.
* when constructing objects from JSON. * @see #addTrustedPackages(Collection)
* If any of the supplied packages is {@code "*"}, all packages are trusted. */
* If a class for a non-trusted package is encountered, the header is returned to the public void addTrustedPackages(String... packagesToTrust) {
* application with value of type {@link NonTrustedHeaderType}. if (Objects.nonNull(packagesToTrust)) {
* @param packagesToTrust the packages to trust. addTrustedPackages(Arrays.asList(packagesToTrust));
*/ }
public void addTrustedPackages(Collection<String> packagesToTrust) { }
if (packagesToTrust != null) {
for (String whiteList : packagesToTrust) {
if ("*".equals(whiteList)) {
this.trustedPackages.clear();
break;
}
else {
this.trustedPackages.add(whiteList);
}
}
}
}
public Set<String> getTrustedPackages() { /**
return this.trustedPackages; * Add packages to the trusted packages list (default {@code java.util, java.lang})
} * used when constructing objects from JSON. If any of the supplied packages is
* {@code "*"}, all packages are trusted. If a class for a non-trusted package is
* encountered, the header is returned to the application with value of type
* {@link NonTrustedHeaderType}.
* @param packagesToTrust the packages to trust.
*/
public void addTrustedPackages(Collection<String> packagesToTrust) {
if (packagesToTrust != null) {
for (String whiteList : packagesToTrust) {
if ("*".equals(whiteList)) {
this.trustedPackages.clear();
break;
}
else {
this.trustedPackages.add(whiteList);
}
}
}
}
public ObjectMapper getObjectMapper() { public Set<String> getTrustedPackages() {
return objectMapper; return this.trustedPackages;
} }
private Object decodeValue(String jsonString, Class<?> type) throws IOException, LinkageError { public ObjectMapper getObjectMapper() {
Object value = objectMapper.readValue(jsonString, type); return objectMapper;
if (type.equals(NonTrustedHeaderType.class)) { }
// Upstream NTHT propagated; may be trusted here...
NonTrustedHeaderType nth = (NonTrustedHeaderType) value;
if (trusted(nth.getUntrustedType())) {
try {
value = objectMapper.readValue(nth.getHeaderValue(),
ClassUtils.forName(nth.getUntrustedType(), null));
}
catch (Exception e) {
log.error("Could not decode header: " + nth,e);
}
}
}
return value;
}
@Nullable private Object decodeValue(String jsonString, Class<?> type)
private Map<String, String> decodeJsonTypes(Map<String, String> source) { throws IOException, LinkageError {
if(source.containsKey(JSON_TYPES)){ Object value = objectMapper.readValue(jsonString, type);
String value=source.get(JSON_TYPES); if (type.equals(NonTrustedHeaderType.class)) {
try { // Upstream NTHT propagated; may be trusted here...
return objectMapper.readValue(value,new TypeReference<Map<String,String>>(){}); NonTrustedHeaderType nth = (NonTrustedHeaderType) value;
} if (trusted(nth.getUntrustedType())) {
catch (IOException e) { try {
log.error("Could not decode json types: " + value,e); value = objectMapper.readValue(nth.getHeaderValue(),
} ClassUtils.forName(nth.getUntrustedType(), null));
} }
return null; catch (Exception e) {
} log.error("Could not decode header: " + nth, e);
}
}
}
return value;
}
protected boolean trusted(String requestedType) { @Nullable
if (requestedType.equals(NonTrustedHeaderType.class.getName())) { private Map<String, String> decodeJsonTypes(Map<String, String> source) {
return true; if (source.containsKey(JSON_TYPES)) {
} String value = source.get(JSON_TYPES);
if (!this.trustedPackages.isEmpty()) { try {
int lastDot = requestedType.lastIndexOf('.'); return objectMapper.readValue(value,
if (lastDot < 0) { new TypeReference<Map<String, String>>() {
return false; });
} }
String packageName = requestedType.substring(0, lastDot); catch (IOException e) {
for (String trustedPackage : this.trustedPackages) { log.error("Could not decode json types: " + value, e);
if (packageName.equals(trustedPackage) || packageName.startsWith(trustedPackage + ".")) { }
return true; }
} return null;
} }
return false;
}
return true;
}
/** protected boolean trusted(String requestedType) {
* Represents a header that could not be decoded due to an untrusted type. if (requestedType.equals(NonTrustedHeaderType.class.getName())) {
*/ return true;
public static class NonTrustedHeaderType { }
if (!this.trustedPackages.isEmpty()) {
int lastDot = requestedType.lastIndexOf('.');
if (lastDot < 0) {
return false;
}
String packageName = requestedType.substring(0, lastDot);
for (String trustedPackage : this.trustedPackages) {
if (packageName.equals(trustedPackage)
|| packageName.startsWith(trustedPackage + ".")) {
return true;
}
}
return false;
}
return true;
}
private String headerValue; /**
* Represents a header that could not be decoded due to an untrusted type.
*/
public static class NonTrustedHeaderType {
private String untrustedType; private String headerValue;
public NonTrustedHeaderType() { private String untrustedType;
super();
}
NonTrustedHeaderType(String headerValue, String untrustedType) { public NonTrustedHeaderType() {
this.headerValue = headerValue; super();
this.untrustedType = untrustedType; }
}
NonTrustedHeaderType(String headerValue, String untrustedType) {
this.headerValue = headerValue;
this.untrustedType = untrustedType;
}
public void setHeaderValue(String headerValue) { public void setHeaderValue(String headerValue) {
this.headerValue = headerValue; this.headerValue = headerValue;
} }
public String getHeaderValue() { public String getHeaderValue() {
return this.headerValue; return this.headerValue;
} }
public void setUntrustedType(String untrustedType) { public void setUntrustedType(String untrustedType) {
this.untrustedType = untrustedType; this.untrustedType = untrustedType;
} }
public String getUntrustedType() { public String getUntrustedType() {
return this.untrustedType; return this.untrustedType;
} }
@Override @Override
public String toString() { public String toString() {
return "NonTrustedHeaderType [headerValue=" + headerValue return "NonTrustedHeaderType [headerValue=" + headerValue + ", untrustedType="
+ ", untrustedType=" + this.untrustedType + "]"; + this.untrustedType + "]";
} }
} }
} }

@ -1,27 +1,28 @@
package com.alibaba.cloud.stream.binder.rocketmq.support; package com.alibaba.cloud.stream.binder.rocketmq.support;
import org.springframework.messaging.MessageHeaders;
import java.util.Map; import java.util.Map;
import org.springframework.messaging.MessageHeaders;
/** /**
* header value mapper for RocketMQ * header value mapper for RocketMQ
* *
* @author caotc * @author caotc
* @date 2019-08-22 * @date 2019-08-22
* @since 2.1.1 * @since 2.1.1
*/ */
public interface RocketMQHeaderMapper { public interface RocketMQHeaderMapper {
/** /**
* Map from the given {@link MessageHeaders} to the specified target message. * Map from the given {@link MessageHeaders} to the specified target message.
* @param headers the abstracted MessageHeaders. * @param headers the abstracted MessageHeaders.
* @return the native target message. * @return the native target message.
*/ */
Map<String,String> fromHeaders(MessageHeaders headers); Map<String, String> fromHeaders(MessageHeaders headers);
/**
* Map from the given target message to abstracted {@link MessageHeaders}. /**
* @param source the native target message. * Map from the given target message to abstracted {@link MessageHeaders}.
* @return the target headers. * @param source the native target message.
*/ * @return the target headers.
MessageHeaders toHeaders(Map<String,String> source); */
MessageHeaders toHeaders(Map<String, String> source);
} }

Loading…
Cancel
Save