Fix  - global codec not registered
pull/1547/head
Nikita Koksharov committed by GitHub
commit 9ea442193d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -104,11 +104,11 @@ public class Redisson implements RedissonClient {
RedissonObjectFactory.warmUp();
RedissonReference.warmUp();
}
protected final QueueTransferService queueTransferService = new QueueTransferService();
protected final EvictionScheduler evictionScheduler;
protected final ConnectionManager connectionManager;
protected final ConcurrentMap<Class<?>, Class<?>> liveObjectClassCache = PlatformDependent.newConcurrentHashMap();
protected final Config config;
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
@ -118,19 +118,19 @@ public class Redisson implements RedissonClient {
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor());
}
public EvictionScheduler getEvictionScheduler() {
return evictionScheduler;
}
public CommandExecutor getCommandExecutor() {
return connectionManager.getCommandExecutor();
}
public ConnectionManager getConnectionManager() {
return connectionManager;
}
@ -190,27 +190,27 @@ public class Redisson implements RedissonClient {
}
return react;
}
@Override
public <K, V> RStream<K, V> getStream(String name) {
return new RedissonStream<K, V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RStream<K, V> getStream(String name, Codec codec) {
return new RedissonStream<K, V>(codec, connectionManager.getCommandExecutor(), name);
}
@Override
public RBinaryStream getBinaryStream(String name) {
return new RedissonBinaryStream(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RGeo<V> getGeo(String name) {
return new RedissonGeo<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RGeo<V> getGeo(String name, Codec codec) {
return new RedissonGeo<V>(codec, connectionManager.getCommandExecutor(), name, this);
@ -225,7 +225,7 @@ public class Redisson implements RedissonClient {
public RRateLimiter getRateLimiter(String name) {
return new RedissonRateLimiter(connectionManager.getCommandExecutor(), name);
}
@Override
public <V> RBucket<V> getBucket(String name, Codec codec) {
return new RedissonBucket<V>(codec, connectionManager.getCommandExecutor(), name);
@ -235,12 +235,12 @@ public class Redisson implements RedissonClient {
public RBuckets getBuckets() {
return new RedissonBuckets(this, connectionManager.getCommandExecutor());
}
@Override
public RBuckets getBuckets(Codec codec) {
return new RedissonBuckets(this, codec, connectionManager.getCommandExecutor());
}
@Override
public <V> RHyperLogLog<V> getHyperLogLog(String name) {
return new RedissonHyperLogLog<V>(connectionManager.getCommandExecutor(), name);
@ -285,7 +285,7 @@ public class Redisson implements RedissonClient {
public <K, V> RMap<K, V> getMap(String name) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, null);
}
@Override
public <K, V> RMap<K, V> getMap(String name, MapOptions<K, V> options) {
return new RedissonMap<K, V>(connectionManager.getCommandExecutor(), name, this, options);
@ -295,12 +295,12 @@ public class Redisson implements RedissonClient {
public <K, V> RSetMultimap<K, V> getSetMultimap(String name) {
return new RedissonSetMultimap<K, V>(connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RSetMultimapCache<K, V> getSetMultimapCache(String name, Codec codec) {
return new RedissonSetMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
@ -310,7 +310,7 @@ public class Redisson implements RedissonClient {
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name);
}
@Override
public <K, V> RListMultimapCache<K, V> getListMultimapCache(String name, Codec codec) {
return new RedissonListMultimapCache<K, V>(evictionScheduler, codec, connectionManager.getCommandExecutor(), name);
@ -340,12 +340,12 @@ public class Redisson implements RedissonClient {
public <K, V> RMapCache<K, V> getMapCache(String name, MapOptions<K, V> options) {
return new RedissonMapCache<K, V>(evictionScheduler, connectionManager.getCommandExecutor(), name, this, options);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, null);
}
@Override
public <K, V> RMapCache<K, V> getMapCache(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMapCache<K, V>(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, options);
@ -355,7 +355,7 @@ public class Redisson implements RedissonClient {
public <K, V> RMap<K, V> getMap(String name, Codec codec) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, null);
}
@Override
public <K, V> RMap<K, V> getMap(String name, Codec codec, MapOptions<K, V> options) {
return new RedissonMap<K, V>(codec, connectionManager.getCommandExecutor(), name, this, options);
@ -370,7 +370,7 @@ public class Redisson implements RedissonClient {
public RLock getFairLock(String name) {
return new RedissonFairLock(connectionManager.getCommandExecutor(), name);
}
@Override
public RReadWriteLock getReadWriteLock(String name) {
return new RedissonReadWriteLock(connectionManager.getCommandExecutor(), name);
@ -395,12 +395,12 @@ public class Redisson implements RedissonClient {
public RScheduledExecutorService getExecutorService(String name) {
return getExecutorService(name, connectionManager.getCodec());
}
@Override
public RScheduledExecutorService getExecutorService(String name, ExecutorOptions options) {
return getExecutorService(name, connectionManager.getCodec(), options);
}
@Override
@Deprecated
public RScheduledExecutorService getExecutorService(Codec codec, String name) {
@ -411,12 +411,12 @@ public class Redisson implements RedissonClient {
public RScheduledExecutorService getExecutorService(String name, Codec codec) {
return getExecutorService(name, codec, ExecutorOptions.defaults());
}
@Override
public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) {
return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, options);
}
@Override
public RRemoteService getRemoteService() {
return getRemoteService("redisson_rs", connectionManager.getCodec());
@ -426,12 +426,12 @@ public class Redisson implements RedissonClient {
public RRemoteService getRemoteService(String name) {
return getRemoteService(name, connectionManager.getCodec());
}
@Override
public RRemoteService getRemoteService(Codec codec) {
return getRemoteService("redisson_rs", codec);
}
@Override
public RRemoteService getRemoteService(String name, Codec codec) {
String executorId;
@ -495,7 +495,7 @@ public class Redisson implements RedissonClient {
}
return new RedissonDelayedQueue<V>(queueTransferService, destinationQueue.getCodec(), connectionManager.getCommandExecutor(), destinationQueue.getName());
}
@Override
public <V> RQueue<V> getQueue(String name) {
return new RedissonQueue<V>(connectionManager.getCommandExecutor(), name, this);
@ -515,7 +515,7 @@ public class Redisson implements RedissonClient {
public <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
return new RedissonBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RBoundedBlockingQueue<V> getBoundedBlockingQueue(String name) {
return new RedissonBoundedBlockingQueue<V>(semaphorePubSub, connectionManager.getCommandExecutor(), name, this);
@ -550,12 +550,12 @@ public class Redisson implements RedissonClient {
public RAtomicLong getAtomicLong(String name) {
return new RedissonAtomicLong(connectionManager.getCommandExecutor(), name);
}
@Override
public RLongAdder getLongAdder(String name) {
return new RedissonLongAdder(connectionManager.getCommandExecutor(), name, this);
}
@Override
public RDoubleAdder getDoubleAdder(String name) {
return new RedissonDoubleAdder(connectionManager.getCommandExecutor(), name, this);
@ -580,7 +580,7 @@ public class Redisson implements RedissonClient {
public RSemaphore getSemaphore(String name) {
return new RedissonSemaphore(connectionManager.getCommandExecutor(), name, semaphorePubSub);
}
@Override
public RPermitExpirableSemaphore getPermitExpirableSemaphore(String name) {
return new RedissonPermitExpirableSemaphore(connectionManager.getCommandExecutor(), name, semaphorePubSub);
@ -614,7 +614,7 @@ public class Redisson implements RedissonClient {
}
return batch;
}
@Override
public RBatch createBatch() {
return createBatch(BatchOptions.defaults());
@ -624,13 +624,13 @@ public class Redisson implements RedissonClient {
public RLiveObjectService getLiveObjectService() {
return new RedissonLiveObjectService(this, liveObjectClassCache);
}
@Override
public void shutdown() {
connectionManager.shutdown();
}
@Override
public void shutdown(long quietPeriod, long timeout, TimeUnit unit) {
connectionManager.shutdown(quietPeriod, timeout, unit);
@ -677,12 +677,12 @@ public class Redisson implements RedissonClient {
public <V> RPriorityQueue<V> getPriorityQueue(String name, Codec codec) {
return new RedissonPriorityQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name) {
return new RedissonPriorityBlockingQueue<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RPriorityBlockingQueue<V> getPriorityBlockingQueue(String name, Codec codec) {
return new RedissonPriorityBlockingQueue<V>(codec, connectionManager.getCommandExecutor(), name, this);
@ -692,13 +692,13 @@ public class Redisson implements RedissonClient {
public <V> RPriorityBlockingDeque<V> getPriorityBlockingDeque(String name) {
return new RedissonPriorityBlockingDeque<V>(connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RPriorityBlockingDeque<V> getPriorityBlockingDeque(String name, Codec codec) {
return new RedissonPriorityBlockingDeque<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
@Override
public <V> RPriorityDeque<V> getPriorityDeque(String name) {
return new RedissonPriorityDeque<V>(connectionManager.getCommandExecutor(), name, this);
@ -708,7 +708,6 @@ public class Redisson implements RedissonClient {
public <V> RPriorityDeque<V> getPriorityDeque(String name, Codec codec) {
return new RedissonPriorityDeque<V>(codec, connectionManager.getCommandExecutor(), name, this);
}
}
}

@ -108,14 +108,14 @@ public class RedissonReactive implements RedissonReactiveClient {
protected final ConnectionManager connectionManager;
protected final Config config;
protected final ReferenceCodecProvider codecProvider;
protected final UUID id = UUID.randomUUID();
protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub();
protected RedissonReactive(Config config) {
this.config = config;
Config configCopy = new Config(config);
connectionManager = ConfigSupport.createConnectionManager(configCopy);
commandExecutor = new CommandReactiveService(connectionManager);
evictionScheduler = new EvictionScheduler(commandExecutor);
@ -146,17 +146,17 @@ public class RedissonReactive implements RedissonReactiveClient {
public RSemaphoreReactive getSemaphore(String name) {
return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub);
}
@Override
public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name) {
return new RedissonPermitExpirableSemaphoreReactive(commandExecutor, name, semaphorePubSub);
return new RedissonPermitExpirableSemaphoreReactive(commandExecutor, name, semaphorePubSub);
}
@Override
public RReadWriteLockReactive getReadWriteLock(String name) {
return new RedissonReadWriteLockReactive(commandExecutor, name);
}
@Override
public RLockReactive getLock(String name) {
return new RedissonLockReactive(commandExecutor, name);
@ -196,8 +196,8 @@ public class RedissonReactive implements RedissonReactiveClient {
return buckets;
}
@Override
public <V> RHyperLogLogReactive<V> getHyperLogLog(String name) {
return new RedissonHyperLogLogReactive<V>(commandExecutor, name);
@ -222,7 +222,7 @@ public class RedissonReactive implements RedissonReactiveClient {
public <K, V> RListMultimapReactive<K, V> getListMultimap(String name) {
return new RedissonListMultimapReactive<K, V>(id, commandExecutor, name);
}
@Override
public <K, V> RListMultimapReactive<K, V> getListMultimap(String name, Codec codec) {
return new RedissonListMultimapReactive<K, V>(id, codec, commandExecutor, name);
@ -232,12 +232,12 @@ public class RedissonReactive implements RedissonReactiveClient {
public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name) {
return new RedissonSetMultimapReactive<K, V>(id, commandExecutor, name);
}
@Override
public <K, V> RSetMultimapReactive<K, V> getSetMultimap(String name, Codec codec) {
return new RedissonSetMultimapReactive<K, V>(id, codec, commandExecutor, name);
}
@Override
public <K, V> RMapReactive<K, V> getMap(String name) {
return new RedissonMapReactive<K, V>(commandExecutor, name, null);
@ -337,7 +337,7 @@ public class RedissonReactive implements RedissonReactiveClient {
public RAtomicLongReactive getAtomicLong(String name) {
return new RedissonAtomicLongReactive(commandExecutor, name);
}
@Override
public RAtomicDoubleReactive getAtomicDouble(String name) {
return new RedissonAtomicDoubleReactive(commandExecutor, name);
@ -352,7 +352,7 @@ public class RedissonReactive implements RedissonReactiveClient {
public RScriptReactive getScript() {
return new RedissonScriptReactive(commandExecutor);
}
@Override
public RBatchReactive createBatch(BatchOptions options) {
RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager, options);
@ -381,7 +381,7 @@ public class RedissonReactive implements RedissonReactiveClient {
public ReferenceCodecProvider getCodecProvider() {
return codecProvider;
}
@Override
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);
@ -442,4 +442,3 @@ public class RedissonReactive implements RedissonReactiveClient {
return new RedissonTransactionReactive(commandExecutor, options);
}
}

@ -55,6 +55,8 @@ import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.MapScanResult;
import org.redisson.codec.ReferenceCodecProvider;
import org.redisson.config.Config;
import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
@ -102,6 +104,7 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson) {
if (redisson != null) {
this.redisson = redisson;
enableRedissonReferenceSupport(redisson.getConfig());
this.redissonReactive = null;
}
return this;
@ -111,11 +114,18 @@ public class CommandAsyncService implements CommandAsyncExecutor {
public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) {
if (redissonReactive != null) {
this.redissonReactive = redissonReactive;
enableRedissonReferenceSupport(redissonReactive.getConfig());
this.redisson = null;
}
return this;
}
private void enableRedissonReferenceSupport(Config config) {
Codec codec = config.getCodec();
ReferenceCodecProvider codecProvider = config.getReferenceCodecProvider();
codecProvider.registerCodec((Class<Codec>) codec.getClass(), codec);
}
@Override
public boolean isRedissonReferenceSupportEnabled() {
return redisson != null || redissonReactive != null;

@ -2,13 +2,13 @@ package org.redisson;
import java.util.List;
import static org.junit.Assert.*;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.redisson.api.BatchOptions;
import org.redisson.api.RBatch;
import org.redisson.api.RBatchReactive;
import org.redisson.api.RBucket;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RedissonClient;
import org.redisson.api.*;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.redisson.reactive.RedissonBucketReactive;
import org.redisson.reactive.RedissonMapCacheReactive;
@ -17,7 +17,7 @@ import org.redisson.reactive.RedissonMapCacheReactive;
* @author Rui Gu (https://github.com/jackygurui)
*/
public class RedissonReferenceReactiveTest extends BaseReactiveTest {
@Test
public void test() throws InterruptedException {
RBucketReactive<Object> b1 = redisson.getBucket("b1");
@ -33,7 +33,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
sync(((RedissonMapCacheReactive) sync(b4.get())).fastPut(b1, b2));
assertEquals("b2", ((RBucketReactive) sync(((RedissonMapCacheReactive) sync(b4.get())).get(b1))).getName());
}
@Test
public void testBatch() throws InterruptedException {
RBatchReactive batch = redisson.createBatch(BatchOptions.defaults());
@ -44,7 +44,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
b1.set(b2);
b3.set(b1);
sync(batch.execute());
batch = redisson.createBatch(BatchOptions.defaults());
batch.getBucket("b1").get();
batch.getBucket("b2").get();
@ -54,7 +54,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
assertEquals("b3", result.get(1).getName());
assertEquals("b1", result.get(2).getName());
}
@Test
public void testReactiveToNormal() throws InterruptedException {
RBatchReactive batch = redisson.createBatch(BatchOptions.defaults());
@ -65,7 +65,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
b1.set(b2);
b3.set(b1);
sync(batch.execute());
RedissonClient lredisson = Redisson.create(redisson.getConfig());
RBatch b = lredisson.createBatch(BatchOptions.defaults());
b.getBucket("b1").getAsync();
@ -75,7 +75,54 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest {
assertEquals("b2", result.get(0).getName());
assertEquals("b3", result.get(1).getName());
assertEquals("b1", result.get(2).getName());
lredisson.shutdown();
}
@Test
public void shouldUseDefaultCodec() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
objectMapper.configure(DeserializationFeature.UNWRAP_ROOT_VALUE, false);
JsonJacksonCodec codec = new JsonJacksonCodec(objectMapper);
Config config = new Config();
config.setCodec(codec);
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonReactiveClient reactive = Redisson.createReactive(config);
RBucketReactive<Object> b1 = reactive.getBucket("b1");
sync(b1.set(new MyObject()));
RSetReactive<Object> s1 = reactive.getSet("s1");
assertTrue(sync(s1.add(b1)) == 1);
assertTrue(codec == b1.getCodec());
Config config1 = new Config();
config1.setCodec(codec);
config1.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonReactiveClient reactive1 = Redisson.createReactive(config1);
RSetReactive<RBucketReactive> s2 = reactive1.getSet("s1");
RBucketReactive<MyObject> b2 = sync(s2.iterator(1));
assertTrue(codec == b2.getCodec());
assertTrue(sync(b2.get()) instanceof MyObject);
reactive.shutdown();
reactive1.shutdown();
}
public static class MyObject {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

@ -4,29 +4,14 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.redisson.api.LocalCachedMapOptions;
import org.redisson.api.RBatch;
import org.redisson.api.RBatchReactive;
import org.redisson.api.RBucket;
import org.redisson.api.RBucketAsync;
import org.redisson.api.RBucketReactive;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RGeo;
import org.redisson.api.RList;
import org.redisson.api.RListMultimap;
import org.redisson.api.RLiveObject;
import org.redisson.api.RLiveObjectService;
import org.redisson.api.RLocalCachedMap;
import org.redisson.api.RMap;
import org.redisson.api.RMapCache;
import org.redisson.api.RPriorityQueue;
import org.redisson.api.RQueue;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RSet;
import org.redisson.api.RSetCache;
import org.redisson.api.RSetMultimap;
import org.redisson.api.*;
import org.redisson.client.protocol.ScoredEntry;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
/**
*
@ -233,4 +218,52 @@ public class RedissonReferenceTest extends BaseTest {
assertNotEquals(1, redisson.getKeys().count());
assertEquals(3, redisson.getKeys().count());
}
@Test
public void shouldUseDefaultCodec() throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true);
objectMapper.configure(DeserializationFeature.UNWRAP_ROOT_VALUE, false);
JsonJacksonCodec codec = new JsonJacksonCodec(objectMapper);
Config config = new Config();
config.setCodec(codec);
config.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient redissonClient = Redisson.create(config);
RBucket<Object> b1 = redissonClient.getBucket("b1");
b1.set(new MyObject());
RSet<Object> s1 = redissonClient.getSet("s1");
assertTrue(s1.add(b1));
assertTrue(codec == b1.getCodec());
Config config1 = new Config();
config1.setCodec(codec);
config1.useSingleServer()
.setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort());
RedissonClient redissonClient1 = Redisson.create(config1);
RSet<RBucket> s2 = redissonClient1.getSet("s1");
RBucket<MyObject> b2 = s2.iterator(1).next();
assertTrue(codec == b2.getCodec());
assertTrue(b2.get() instanceof MyObject);
redissonClient.shutdown();
redissonClient1.shutdown();
}
public static class MyObject {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}

Loading…
Cancel
Save