RedissonCodec interface introduced

pull/6/head
Nikita 11 years ago
parent 9ce68282ea
commit c860bb921d

@ -20,12 +20,12 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import com.lambdaworks.redis.codec.JsonJacksonCodec;
import com.lambdaworks.redis.codec.RedisCodec;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.codec.RedissonCodec;
public class Config {
private RedisCodec codec = new JsonJacksonCodec();
private RedissonCodec codec = new JsonJacksonCodec();
private int subscriptionsPerConnection = 5;
@ -46,10 +46,10 @@ public class Config {
setAddresses(oldConf.getAddresses());
}
public void setCodec(RedisCodec codec) {
public void setCodec(RedissonCodec codec) {
this.codec = codec;
}
public RedisCodec getCodec() {
public RedissonCodec getCodec() {
return codec;
}

@ -1,4 +1,4 @@
package com.lambdaworks.redis.codec;
package org.redisson.codec;
import java.nio.ByteBuffer;
@ -13,7 +13,7 @@ import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.jsontype.TypeResolverBuilder;
public class JsonJacksonCodec extends RedisCodec<Object, Object> {
public class JsonJacksonCodec implements RedissonCodec {
private final ObjectMapper objectMapper = new ObjectMapper();

@ -0,0 +1,37 @@
package org.redisson.codec;
import java.nio.ByteBuffer;
import com.lambdaworks.redis.codec.RedisCodec;
public class RedisCodecWrapper extends RedisCodec<Object, Object> {
private final RedissonCodec redissonCodec;
public RedisCodecWrapper(RedissonCodec redissonCodec) {
this.redissonCodec = redissonCodec;
}
@Override
public Object decodeKey(ByteBuffer bytes) {
return redissonCodec.decodeKey(bytes);
}
@Override
public Object decodeValue(ByteBuffer bytes) {
return redissonCodec.decodeValue(bytes);
}
@Override
public byte[] encodeKey(Object key) {
return redissonCodec.encodeKey(key);
}
@Override
public byte[] encodeValue(Object value) {
return redissonCodec.encodeValue(value);
}
}

@ -0,0 +1,43 @@
package org.redisson.codec;
import java.nio.ByteBuffer;
public interface RedissonCodec {
/**
* Decode the key output by redis.
*
* @param bytes Raw bytes of the key.
*
* @return The decoded key.
*/
public abstract Object decodeKey(ByteBuffer bytes);
/**
* Decode the value output by redis.
*
* @param bytes Raw bytes of the value.
*
* @return The decoded value.
*/
public abstract Object decodeValue(ByteBuffer bytes);
/**
* Encode the key for output to redis.
*
* @param key Key.
*
* @return The encoded key.
*/
public abstract byte[] encodeKey(Object key);
/**
* Encode the value for output to redis.
*
* @param value Value.
*
* @return The encoded value.
*/
public abstract byte[] encodeValue(Object value);
}

@ -1,4 +1,4 @@
package com.lambdaworks.redis.codec;
package org.redisson.codec;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -6,7 +6,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
public class StreamCodec extends RedisCodec<Object, Object> {
public class StreamCodec implements RedissonCodec {
@Override
public Object decodeKey(ByteBuffer bytes) {

@ -21,9 +21,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import org.redisson.Config;
import org.redisson.codec.RedisCodecWrapper;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnection;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.RedisPubSubConnection;
import com.lambdaworks.redis.pubsub.RedisPubSubListener;
@ -87,11 +89,13 @@ public class ConnectionManager {
private final Semaphore activeConnections;
private final RedisClient redisClient;
private final RedisCodec codec;
private final Config config;
public ConnectionManager(Config config) {
URI address = config.getAddresses().iterator().next();
redisClient = new RedisClient(address.getHost(), address.getPort());
codec = new RedisCodecWrapper(config.getCodec());
activeConnections = new Semaphore(config.getConnectionPoolSize());
this.config = config;
}
@ -100,7 +104,7 @@ public class ConnectionManager {
activeConnections.acquireUninterruptibly();
RedisConnection<K, V> conn = connections.poll();
if (conn == null) {
conn = redisClient.connect(config.getCodec());
conn = redisClient.connect(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}
@ -116,7 +120,7 @@ public class ConnectionManager {
}
activeConnections.acquireUninterruptibly();
RedisPubSubConnection<K, V> conn = redisClient.connectPubSub(config.getCodec());
RedisPubSubConnection<K, V> conn = redisClient.connectPubSub(codec);
if (config.getPassword() != null) {
conn.auth(config.getPassword());
}

Loading…
Cancel
Save