Fixed - codec provided in Redisson configuration should be used for attribute messages serialization. #1905

pull/1979/head
Nikita Koksharov 6 years ago
parent d8ae21e540
commit ee51283731

@ -15,13 +15,15 @@
*/ */
package org.redisson.tomcat; package org.redisson.tomcat;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;
import org.apache.catalina.util.CustomObjectInputStream; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/** /**
* *
@ -50,22 +52,29 @@ public class AttributeMessage implements Serializable {
return nodeId; return nodeId;
} }
protected byte[] toByteArray(Object value) throws IOException { protected byte[] toByteArray(Encoder encoder, Object value) throws IOException {
if (value == null) { if (value == null) {
return null; return null;
} }
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos); ByteBuf buf = encoder.encode(value);
out.writeObject(value); try {
out.flush(); return ByteBufUtil.getBytes(buf);
return bos.toByteArray(); } finally {
buf.release();
}
} }
protected Object toObject(ClassLoader classLoader, byte[] value) throws IOException, ClassNotFoundException { protected Object toObject(Decoder<?> decoder, byte[] value) throws IOException, ClassNotFoundException {
if (value == null) { if (value == null) {
return null; return null;
} }
CustomObjectInputStream in = new CustomObjectInputStream(new ByteArrayInputStream(value), classLoader);
return in.readObject(); ByteBuf buf = Unpooled.wrappedBuffer(value);
try {
return decoder.decode(buf, null);
} finally {
buf.release();
}
} }
} }

@ -17,6 +17,9 @@ package org.redisson.tomcat;
import java.io.IOException; import java.io.IOException;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -30,18 +33,18 @@ public class AttributeUpdateMessage extends AttributeMessage {
public AttributeUpdateMessage() { public AttributeUpdateMessage() {
} }
public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value) throws IOException { public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
this.name = name; this.name = name;
this.value = toByteArray(value); this.value = toByteArray(encoder, value);
} }
public String getName() { public String getName() {
return name; return name;
} }
public Object getValue(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Object getValue(Decoder<?> decoder) throws IOException, ClassNotFoundException {
return toObject(classLoader, value); return toObject(decoder, value);
} }
} }

@ -20,6 +20,9 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -32,25 +35,25 @@ public class AttributesPutAllMessage extends AttributeMessage {
public AttributesPutAllMessage() { public AttributesPutAllMessage() {
} }
public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs) throws IOException { public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
if (attrs != null) { if (attrs != null) {
this.attrs = new HashMap<String, byte[]>(); this.attrs = new HashMap<String, byte[]>();
for (Entry<String, Object> entry: attrs.entrySet()) { for (Entry<String, Object> entry: attrs.entrySet()) {
this.attrs.put(entry.getKey(), toByteArray(entry.getValue())); this.attrs.put(entry.getKey(), toByteArray(encoder, entry.getValue()));
} }
} else { } else {
this.attrs = null; this.attrs = null;
} }
} }
public Map<String, Object> getAttrs(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Map<String, Object> getAttrs(Decoder<?> decoder) throws IOException, ClassNotFoundException {
if (attrs == null) { if (attrs == null) {
return null; return null;
} }
Map<String, Object> result = new HashMap<String, Object>(); Map<String, Object> result = new HashMap<String, Object>();
for (Entry<String, byte[]> entry: attrs.entrySet()) { for (Entry<String, byte[]> entry: attrs.entrySet()) {
result.put(entry.getKey(), toObject(classLoader, entry.getValue())); result.put(entry.getKey(), toObject(decoder, entry.getValue()));
} }
return result; return result;
} }

@ -187,7 +187,7 @@ public class RedissonSession extends StandardSession {
map.put(entry.getKey(), entry.getValue()); map.put(entry.getKey(), entry.getValue());
} }
try { try {
return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map); return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map, this.map.getCodec().getMapValueEncoder());
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -207,7 +207,7 @@ public class RedissonSession extends StandardSession {
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value)); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }

@ -235,6 +235,16 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
applicationClassLoader = getClass().getClassLoader(); applicationClassLoader = getClass().getClassLoader();
} }
Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
.newInstance(applicationClassLoader, codec);
} catch (Exception e) {
throw new LifecycleException(e);
}
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this)); getEngine().getPipeline().addValve(new UpdateValve(this));
} }
@ -259,14 +269,14 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
if (msg instanceof AttributesPutAllMessage) { if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg; AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs(applicationClassLoader).entrySet()) { for (Entry<String, Object> entry : m.getAttrs(codecToUse.getMapValueDecoder()).entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true); session.superSetAttribute(entry.getKey(), entry.getValue(), true);
} }
} }
if (msg instanceof AttributeUpdateMessage) { if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg; AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(applicationClassLoader), true); session.superSetAttribute(m.getName(), m.getValue(codecToUse.getMapValueDecoder()), true);
} }
} }
} catch (Exception e) { } catch (Exception e) {
@ -294,15 +304,6 @@ public class RedissonSessionManager extends ManagerBase implements Lifecycle {
} }
try { try {
try {
Config c = new Config(config);
Codec codec = c.getCodec().getClass().getConstructor(ClassLoader.class)
.newInstance(Thread.currentThread().getContextClassLoader());
config.setCodec(codec);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize codec with ClassLoader parameter", e);
}
return Redisson.create(config); return Redisson.create(config);
} catch (Exception e) { } catch (Exception e) {
throw new LifecycleException(e); throw new LifecycleException(e);

@ -15,13 +15,15 @@
*/ */
package org.redisson.tomcat; package org.redisson.tomcat;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;
import org.apache.catalina.util.CustomObjectInputStream; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/** /**
* *
@ -50,22 +52,29 @@ public class AttributeMessage implements Serializable {
return nodeId; return nodeId;
} }
protected byte[] toByteArray(Object value) throws IOException { protected byte[] toByteArray(Encoder encoder, Object value) throws IOException {
if (value == null) { if (value == null) {
return null; return null;
} }
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos); ByteBuf buf = encoder.encode(value);
out.writeObject(value); try {
out.flush(); return ByteBufUtil.getBytes(buf);
return bos.toByteArray(); } finally {
buf.release();
}
} }
protected Object toObject(ClassLoader classLoader, byte[] value) throws IOException, ClassNotFoundException { protected Object toObject(Decoder<?> decoder, byte[] value) throws IOException, ClassNotFoundException {
if (value == null) { if (value == null) {
return null; return null;
} }
CustomObjectInputStream in = new CustomObjectInputStream(new ByteArrayInputStream(value), classLoader);
return in.readObject(); ByteBuf buf = Unpooled.wrappedBuffer(value);
try {
return decoder.decode(buf, null);
} finally {
buf.release();
}
} }
} }

@ -17,6 +17,9 @@ package org.redisson.tomcat;
import java.io.IOException; import java.io.IOException;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -30,18 +33,18 @@ public class AttributeUpdateMessage extends AttributeMessage {
public AttributeUpdateMessage() { public AttributeUpdateMessage() {
} }
public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value) throws IOException { public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
this.name = name; this.name = name;
this.value = toByteArray(value); this.value = toByteArray(encoder, value);
} }
public String getName() { public String getName() {
return name; return name;
} }
public Object getValue(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Object getValue(Decoder<?> decoder) throws IOException, ClassNotFoundException {
return toObject(classLoader, value); return toObject(decoder, value);
} }
} }

@ -20,6 +20,9 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -32,25 +35,25 @@ public class AttributesPutAllMessage extends AttributeMessage {
public AttributesPutAllMessage() { public AttributesPutAllMessage() {
} }
public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs) throws IOException { public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
if (attrs != null) { if (attrs != null) {
this.attrs = new HashMap<String, byte[]>(); this.attrs = new HashMap<String, byte[]>();
for (Entry<String, Object> entry: attrs.entrySet()) { for (Entry<String, Object> entry: attrs.entrySet()) {
this.attrs.put(entry.getKey(), toByteArray(entry.getValue())); this.attrs.put(entry.getKey(), toByteArray(encoder, entry.getValue()));
} }
} else { } else {
this.attrs = null; this.attrs = null;
} }
} }
public Map<String, Object> getAttrs(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Map<String, Object> getAttrs(Decoder<?> decoder) throws IOException, ClassNotFoundException {
if (attrs == null) { if (attrs == null) {
return null; return null;
} }
Map<String, Object> result = new HashMap<String, Object>(); Map<String, Object> result = new HashMap<String, Object>();
for (Entry<String, byte[]> entry: attrs.entrySet()) { for (Entry<String, byte[]> entry: attrs.entrySet()) {
result.put(entry.getKey(), toObject(classLoader, entry.getValue())); result.put(entry.getKey(), toObject(decoder, entry.getValue()));
} }
return result; return result;
} }

@ -187,7 +187,7 @@ public class RedissonSession extends StandardSession {
map.put(entry.getKey(), entry.getValue()); map.put(entry.getKey(), entry.getValue());
} }
try { try {
return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map); return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map, this.map.getCodec().getMapValueEncoder());
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -207,7 +207,7 @@ public class RedissonSession extends StandardSession {
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value)); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }

@ -36,6 +36,7 @@ import org.redisson.api.RMap;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec; import org.redisson.codec.CompositeCodec;
import org.redisson.config.Config; import org.redisson.config.Config;
@ -214,6 +215,16 @@ public class RedissonSessionManager extends ManagerBase {
applicationClassLoader = getClass().getClassLoader(); applicationClassLoader = getClass().getClassLoader();
} }
Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
.newInstance(applicationClassLoader, codec);
} catch (Exception e) {
throw new LifecycleException(e);
}
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this)); getEngine().getPipeline().addValve(new UpdateValve(this));
} }
@ -238,14 +249,14 @@ public class RedissonSessionManager extends ManagerBase {
if (msg instanceof AttributesPutAllMessage) { if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg; AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs(applicationClassLoader).entrySet()) { for (Entry<String, Object> entry : m.getAttrs(codecToUse.getMapValueDecoder()).entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true); session.superSetAttribute(entry.getKey(), entry.getValue(), true);
} }
} }
if (msg instanceof AttributeUpdateMessage) { if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg; AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(applicationClassLoader), true); session.superSetAttribute(m.getName(), m.getValue(codecToUse.getMapValueDecoder()), true);
} }
} }
} catch (Exception e) { } catch (Exception e) {

@ -15,13 +15,15 @@
*/ */
package org.redisson.tomcat; package org.redisson.tomcat;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;
import org.apache.catalina.util.CustomObjectInputStream; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/** /**
* *
@ -50,22 +52,29 @@ public class AttributeMessage implements Serializable {
return nodeId; return nodeId;
} }
protected byte[] toByteArray(Object value) throws IOException { protected byte[] toByteArray(Encoder encoder, Object value) throws IOException {
if (value == null) { if (value == null) {
return null; return null;
} }
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos); ByteBuf buf = encoder.encode(value);
out.writeObject(value); try {
out.flush(); return ByteBufUtil.getBytes(buf);
return bos.toByteArray(); } finally {
buf.release();
}
} }
protected Object toObject(ClassLoader classLoader, byte[] value) throws IOException, ClassNotFoundException { protected Object toObject(Decoder<?> decoder, byte[] value) throws IOException, ClassNotFoundException {
if (value == null) { if (value == null) {
return null; return null;
} }
CustomObjectInputStream in = new CustomObjectInputStream(new ByteArrayInputStream(value), classLoader);
return in.readObject(); ByteBuf buf = Unpooled.wrappedBuffer(value);
try {
return decoder.decode(buf, null);
} finally {
buf.release();
}
} }
} }

@ -17,6 +17,9 @@ package org.redisson.tomcat;
import java.io.IOException; import java.io.IOException;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -30,18 +33,18 @@ public class AttributeUpdateMessage extends AttributeMessage {
public AttributeUpdateMessage() { public AttributeUpdateMessage() {
} }
public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value) throws IOException { public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
this.name = name; this.name = name;
this.value = toByteArray(value); this.value = toByteArray(encoder, value);
} }
public String getName() { public String getName() {
return name; return name;
} }
public Object getValue(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Object getValue(Decoder<?> decoder) throws IOException, ClassNotFoundException {
return toObject(classLoader, value); return toObject(decoder, value);
} }
} }

@ -20,6 +20,9 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -32,25 +35,25 @@ public class AttributesPutAllMessage extends AttributeMessage {
public AttributesPutAllMessage() { public AttributesPutAllMessage() {
} }
public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs) throws IOException { public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
if (attrs != null) { if (attrs != null) {
this.attrs = new HashMap<String, byte[]>(); this.attrs = new HashMap<String, byte[]>();
for (Entry<String, Object> entry: attrs.entrySet()) { for (Entry<String, Object> entry: attrs.entrySet()) {
this.attrs.put(entry.getKey(), toByteArray(entry.getValue())); this.attrs.put(entry.getKey(), toByteArray(encoder, entry.getValue()));
} }
} else { } else {
this.attrs = null; this.attrs = null;
} }
} }
public Map<String, Object> getAttrs(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Map<String, Object> getAttrs(Decoder<?> decoder) throws IOException, ClassNotFoundException {
if (attrs == null) { if (attrs == null) {
return null; return null;
} }
Map<String, Object> result = new HashMap<String, Object>(); Map<String, Object> result = new HashMap<String, Object>();
for (Entry<String, byte[]> entry: attrs.entrySet()) { for (Entry<String, byte[]> entry: attrs.entrySet()) {
result.put(entry.getKey(), toObject(classLoader, entry.getValue())); result.put(entry.getKey(), toObject(decoder, entry.getValue()));
} }
return result; return result;
} }

@ -187,7 +187,7 @@ public class RedissonSession extends StandardSession {
map.put(entry.getKey(), entry.getValue()); map.put(entry.getKey(), entry.getValue());
} }
try { try {
return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map); return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map, this.map.getCodec().getMapValueEncoder());
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -207,7 +207,7 @@ public class RedissonSession extends StandardSession {
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value)); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }

@ -35,6 +35,7 @@ import org.redisson.api.RMap;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec; import org.redisson.codec.CompositeCodec;
import org.redisson.config.Config; import org.redisson.config.Config;
@ -213,6 +214,16 @@ public class RedissonSessionManager extends ManagerBase {
applicationClassLoader = getClass().getClassLoader(); applicationClassLoader = getClass().getClassLoader();
} }
Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
.newInstance(applicationClassLoader, codec);
} catch (Exception e) {
throw new LifecycleException(e);
}
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this)); getEngine().getPipeline().addValve(new UpdateValve(this));
} }
@ -237,14 +248,14 @@ public class RedissonSessionManager extends ManagerBase {
if (msg instanceof AttributesPutAllMessage) { if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg; AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs(applicationClassLoader).entrySet()) { for (Entry<String, Object> entry : m.getAttrs(codecToUse.getMapValueDecoder()).entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true); session.superSetAttribute(entry.getKey(), entry.getValue(), true);
} }
} }
if (msg instanceof AttributeUpdateMessage) { if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg; AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(applicationClassLoader), true); session.superSetAttribute(m.getName(), m.getValue(codecToUse.getMapValueDecoder()), true);
} }
} }
} catch (Exception e) { } catch (Exception e) {

@ -15,13 +15,15 @@
*/ */
package org.redisson.tomcat; package org.redisson.tomcat;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable; import java.io.Serializable;
import org.apache.catalina.util.CustomObjectInputStream; import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
/** /**
* *
@ -50,22 +52,29 @@ public class AttributeMessage implements Serializable {
return nodeId; return nodeId;
} }
protected byte[] toByteArray(Object value) throws IOException { protected byte[] toByteArray(Encoder encoder, Object value) throws IOException {
if (value == null) { if (value == null) {
return null; return null;
} }
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bos); ByteBuf buf = encoder.encode(value);
out.writeObject(value); try {
out.flush(); return ByteBufUtil.getBytes(buf);
return bos.toByteArray(); } finally {
buf.release();
}
} }
protected Object toObject(ClassLoader classLoader, byte[] value) throws IOException, ClassNotFoundException { protected Object toObject(Decoder<?> decoder, byte[] value) throws IOException, ClassNotFoundException {
if (value == null) { if (value == null) {
return null; return null;
} }
CustomObjectInputStream in = new CustomObjectInputStream(new ByteArrayInputStream(value), classLoader);
return in.readObject(); ByteBuf buf = Unpooled.wrappedBuffer(value);
try {
return decoder.decode(buf, null);
} finally {
buf.release();
}
} }
} }

@ -17,6 +17,9 @@ package org.redisson.tomcat;
import java.io.IOException; import java.io.IOException;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -30,18 +33,18 @@ public class AttributeUpdateMessage extends AttributeMessage {
public AttributeUpdateMessage() { public AttributeUpdateMessage() {
} }
public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value) throws IOException { public AttributeUpdateMessage(String nodeId, String sessionId, String name, Object value, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
this.name = name; this.name = name;
this.value = toByteArray(value); this.value = toByteArray(encoder, value);
} }
public String getName() { public String getName() {
return name; return name;
} }
public Object getValue(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Object getValue(Decoder<?> decoder) throws IOException, ClassNotFoundException {
return toObject(classLoader, value); return toObject(decoder, value);
} }
} }

@ -20,6 +20,9 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
/** /**
* *
* @author Nikita Koksharov * @author Nikita Koksharov
@ -32,25 +35,25 @@ public class AttributesPutAllMessage extends AttributeMessage {
public AttributesPutAllMessage() { public AttributesPutAllMessage() {
} }
public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs) throws IOException { public AttributesPutAllMessage(String nodeId, String sessionId, Map<String, Object> attrs, Encoder encoder) throws IOException {
super(nodeId, sessionId); super(nodeId, sessionId);
if (attrs != null) { if (attrs != null) {
this.attrs = new HashMap<String, byte[]>(); this.attrs = new HashMap<String, byte[]>();
for (Entry<String, Object> entry: attrs.entrySet()) { for (Entry<String, Object> entry: attrs.entrySet()) {
this.attrs.put(entry.getKey(), toByteArray(entry.getValue())); this.attrs.put(entry.getKey(), toByteArray(encoder, entry.getValue()));
} }
} else { } else {
this.attrs = null; this.attrs = null;
} }
} }
public Map<String, Object> getAttrs(ClassLoader classLoader) throws IOException, ClassNotFoundException { public Map<String, Object> getAttrs(Decoder<?> decoder) throws IOException, ClassNotFoundException {
if (attrs == null) { if (attrs == null) {
return null; return null;
} }
Map<String, Object> result = new HashMap<String, Object>(); Map<String, Object> result = new HashMap<String, Object>();
for (Entry<String, byte[]> entry: attrs.entrySet()) { for (Entry<String, byte[]> entry: attrs.entrySet()) {
result.put(entry.getKey(), toObject(classLoader, entry.getValue())); result.put(entry.getKey(), toObject(decoder, entry.getValue()));
} }
return result; return result;
} }

@ -187,7 +187,7 @@ public class RedissonSession extends StandardSession {
map.put(entry.getKey(), entry.getValue()); map.put(entry.getKey(), entry.getValue());
} }
try { try {
return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map); return new AttributesPutAllMessage(redissonManager.getNodeId(), getId(), map, this.map.getCodec().getMapValueEncoder());
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -207,7 +207,7 @@ public class RedissonSession extends StandardSession {
map.fastPut(name, value); map.fastPut(name, value);
if (readMode == ReadMode.MEMORY) { if (readMode == ReadMode.MEMORY) {
try { try {
topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value)); topic.publish(new AttributeUpdateMessage(redissonManager.getNodeId(), getId(), name, value, this.map.getCodec().getMapValueEncoder()));
} catch (IOException e) { } catch (IOException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }

@ -35,6 +35,7 @@ import org.redisson.api.RMap;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener; import org.redisson.api.listener.MessageListener;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec; import org.redisson.client.codec.StringCodec;
import org.redisson.codec.CompositeCodec; import org.redisson.codec.CompositeCodec;
import org.redisson.config.Config; import org.redisson.config.Config;
@ -213,6 +214,16 @@ public class RedissonSessionManager extends ManagerBase {
applicationClassLoader = getClass().getClassLoader(); applicationClassLoader = getClass().getClassLoader();
} }
Codec codec = redisson.getConfig().getCodec();
Codec codecToUse;
try {
codecToUse = codec.getClass()
.getConstructor(ClassLoader.class, codec.getClass())
.newInstance(applicationClassLoader, codec);
} catch (Exception e) {
throw new LifecycleException(e);
}
if (updateMode == UpdateMode.AFTER_REQUEST) { if (updateMode == UpdateMode.AFTER_REQUEST) {
getEngine().getPipeline().addValve(new UpdateValve(this)); getEngine().getPipeline().addValve(new UpdateValve(this));
} }
@ -237,14 +248,14 @@ public class RedissonSessionManager extends ManagerBase {
if (msg instanceof AttributesPutAllMessage) { if (msg instanceof AttributesPutAllMessage) {
AttributesPutAllMessage m = (AttributesPutAllMessage) msg; AttributesPutAllMessage m = (AttributesPutAllMessage) msg;
for (Entry<String, Object> entry : m.getAttrs(applicationClassLoader).entrySet()) { for (Entry<String, Object> entry : m.getAttrs(codecToUse.getMapValueDecoder()).entrySet()) {
session.superSetAttribute(entry.getKey(), entry.getValue(), true); session.superSetAttribute(entry.getKey(), entry.getValue(), true);
} }
} }
if (msg instanceof AttributeUpdateMessage) { if (msg instanceof AttributeUpdateMessage) {
AttributeUpdateMessage m = (AttributeUpdateMessage)msg; AttributeUpdateMessage m = (AttributeUpdateMessage)msg;
session.superSetAttribute(m.getName(), m.getValue(applicationClassLoader), true); session.superSetAttribute(m.getName(), m.getValue(codecToUse.getMapValueDecoder()), true);
} }
} }
} catch (Exception e) { } catch (Exception e) {

Loading…
Cancel
Save