diff --git a/redisson/src/main/java/org/redisson/Redisson.java b/redisson/src/main/java/org/redisson/Redisson.java index efb1ae4e7..e05352662 100755 --- a/redisson/src/main/java/org/redisson/Redisson.java +++ b/redisson/src/main/java/org/redisson/Redisson.java @@ -104,11 +104,11 @@ public class Redisson implements RedissonClient { RedissonObjectFactory.warmUp(); RedissonReference.warmUp(); } - + protected final QueueTransferService queueTransferService = new QueueTransferService(); protected final EvictionScheduler evictionScheduler; protected final ConnectionManager connectionManager; - + protected final ConcurrentMap, Class> liveObjectClassCache = PlatformDependent.newConcurrentHashMap(); protected final Config config; protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(); @@ -118,19 +118,19 @@ public class Redisson implements RedissonClient { protected Redisson(Config config) { this.config = config; Config configCopy = new Config(config); - + connectionManager = ConfigSupport.createConnectionManager(configCopy); evictionScheduler = new EvictionScheduler(connectionManager.getCommandExecutor()); } - + public EvictionScheduler getEvictionScheduler() { return evictionScheduler; } - + public CommandExecutor getCommandExecutor() { return connectionManager.getCommandExecutor(); } - + public ConnectionManager getConnectionManager() { return connectionManager; } @@ -190,27 +190,27 @@ public class Redisson implements RedissonClient { } return react; } - + @Override public RStream getStream(String name) { return new RedissonStream(connectionManager.getCommandExecutor(), name); } - + @Override public RStream getStream(String name, Codec codec) { return new RedissonStream(codec, connectionManager.getCommandExecutor(), name); } - + @Override public RBinaryStream getBinaryStream(String name) { return new RedissonBinaryStream(connectionManager.getCommandExecutor(), name); } - + @Override public RGeo getGeo(String name) { return new RedissonGeo(connectionManager.getCommandExecutor(), name, this); } - + @Override public RGeo getGeo(String name, Codec codec) { return new RedissonGeo(codec, connectionManager.getCommandExecutor(), name, this); @@ -225,7 +225,7 @@ public class Redisson implements RedissonClient { public RRateLimiter getRateLimiter(String name) { return new RedissonRateLimiter(connectionManager.getCommandExecutor(), name); } - + @Override public RBucket getBucket(String name, Codec codec) { return new RedissonBucket(codec, connectionManager.getCommandExecutor(), name); @@ -235,12 +235,12 @@ public class Redisson implements RedissonClient { public RBuckets getBuckets() { return new RedissonBuckets(this, connectionManager.getCommandExecutor()); } - + @Override public RBuckets getBuckets(Codec codec) { return new RedissonBuckets(this, codec, connectionManager.getCommandExecutor()); } - + @Override public RHyperLogLog getHyperLogLog(String name) { return new RedissonHyperLogLog(connectionManager.getCommandExecutor(), name); @@ -285,7 +285,7 @@ public class Redisson implements RedissonClient { public RMap getMap(String name) { return new RedissonMap(connectionManager.getCommandExecutor(), name, this, null); } - + @Override public RMap getMap(String name, MapOptions options) { return new RedissonMap(connectionManager.getCommandExecutor(), name, this, options); @@ -295,12 +295,12 @@ public class Redisson implements RedissonClient { public RSetMultimap getSetMultimap(String name) { return new RedissonSetMultimap(connectionManager.getCommandExecutor(), name); } - + @Override public RSetMultimapCache getSetMultimapCache(String name) { return new RedissonSetMultimapCache(evictionScheduler, connectionManager.getCommandExecutor(), name); } - + @Override public RSetMultimapCache getSetMultimapCache(String name, Codec codec) { return new RedissonSetMultimapCache(evictionScheduler, codec, connectionManager.getCommandExecutor(), name); @@ -310,7 +310,7 @@ public class Redisson implements RedissonClient { public RListMultimapCache getListMultimapCache(String name) { return new RedissonListMultimapCache(evictionScheduler, connectionManager.getCommandExecutor(), name); } - + @Override public RListMultimapCache getListMultimapCache(String name, Codec codec) { return new RedissonListMultimapCache(evictionScheduler, codec, connectionManager.getCommandExecutor(), name); @@ -340,12 +340,12 @@ public class Redisson implements RedissonClient { public RMapCache getMapCache(String name, MapOptions options) { return new RedissonMapCache(evictionScheduler, connectionManager.getCommandExecutor(), name, this, options); } - + @Override public RMapCache getMapCache(String name, Codec codec) { return new RedissonMapCache(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, null); } - + @Override public RMapCache getMapCache(String name, Codec codec, MapOptions options) { return new RedissonMapCache(codec, evictionScheduler, connectionManager.getCommandExecutor(), name, this, options); @@ -355,7 +355,7 @@ public class Redisson implements RedissonClient { public RMap getMap(String name, Codec codec) { return new RedissonMap(codec, connectionManager.getCommandExecutor(), name, this, null); } - + @Override public RMap getMap(String name, Codec codec, MapOptions options) { return new RedissonMap(codec, connectionManager.getCommandExecutor(), name, this, options); @@ -370,7 +370,7 @@ public class Redisson implements RedissonClient { public RLock getFairLock(String name) { return new RedissonFairLock(connectionManager.getCommandExecutor(), name); } - + @Override public RReadWriteLock getReadWriteLock(String name) { return new RedissonReadWriteLock(connectionManager.getCommandExecutor(), name); @@ -395,12 +395,12 @@ public class Redisson implements RedissonClient { public RScheduledExecutorService getExecutorService(String name) { return getExecutorService(name, connectionManager.getCodec()); } - + @Override public RScheduledExecutorService getExecutorService(String name, ExecutorOptions options) { return getExecutorService(name, connectionManager.getCodec(), options); } - + @Override @Deprecated public RScheduledExecutorService getExecutorService(Codec codec, String name) { @@ -411,12 +411,12 @@ public class Redisson implements RedissonClient { public RScheduledExecutorService getExecutorService(String name, Codec codec) { return getExecutorService(name, codec, ExecutorOptions.defaults()); } - + @Override public RScheduledExecutorService getExecutorService(String name, Codec codec, ExecutorOptions options) { return new RedissonExecutorService(codec, connectionManager.getCommandExecutor(), this, name, queueTransferService, responses, options); } - + @Override public RRemoteService getRemoteService() { return getRemoteService("redisson_rs", connectionManager.getCodec()); @@ -426,12 +426,12 @@ public class Redisson implements RedissonClient { public RRemoteService getRemoteService(String name) { return getRemoteService(name, connectionManager.getCodec()); } - + @Override public RRemoteService getRemoteService(Codec codec) { return getRemoteService("redisson_rs", codec); } - + @Override public RRemoteService getRemoteService(String name, Codec codec) { String executorId; @@ -495,7 +495,7 @@ public class Redisson implements RedissonClient { } return new RedissonDelayedQueue(queueTransferService, destinationQueue.getCodec(), connectionManager.getCommandExecutor(), destinationQueue.getName()); } - + @Override public RQueue getQueue(String name) { return new RedissonQueue(connectionManager.getCommandExecutor(), name, this); @@ -515,7 +515,7 @@ public class Redisson implements RedissonClient { public RBlockingQueue getBlockingQueue(String name, Codec codec) { return new RedissonBlockingQueue(codec, connectionManager.getCommandExecutor(), name, this); } - + @Override public RBoundedBlockingQueue getBoundedBlockingQueue(String name) { return new RedissonBoundedBlockingQueue(semaphorePubSub, connectionManager.getCommandExecutor(), name, this); @@ -550,12 +550,12 @@ public class Redisson implements RedissonClient { public RAtomicLong getAtomicLong(String name) { return new RedissonAtomicLong(connectionManager.getCommandExecutor(), name); } - + @Override public RLongAdder getLongAdder(String name) { return new RedissonLongAdder(connectionManager.getCommandExecutor(), name, this); } - + @Override public RDoubleAdder getDoubleAdder(String name) { return new RedissonDoubleAdder(connectionManager.getCommandExecutor(), name, this); @@ -580,7 +580,7 @@ public class Redisson implements RedissonClient { public RSemaphore getSemaphore(String name) { return new RedissonSemaphore(connectionManager.getCommandExecutor(), name, semaphorePubSub); } - + @Override public RPermitExpirableSemaphore getPermitExpirableSemaphore(String name) { return new RedissonPermitExpirableSemaphore(connectionManager.getCommandExecutor(), name, semaphorePubSub); @@ -614,7 +614,7 @@ public class Redisson implements RedissonClient { } return batch; } - + @Override public RBatch createBatch() { return createBatch(BatchOptions.defaults()); @@ -624,13 +624,13 @@ public class Redisson implements RedissonClient { public RLiveObjectService getLiveObjectService() { return new RedissonLiveObjectService(this, liveObjectClassCache); } - + @Override public void shutdown() { connectionManager.shutdown(); } - - + + @Override public void shutdown(long quietPeriod, long timeout, TimeUnit unit) { connectionManager.shutdown(quietPeriod, timeout, unit); @@ -677,12 +677,12 @@ public class Redisson implements RedissonClient { public RPriorityQueue getPriorityQueue(String name, Codec codec) { return new RedissonPriorityQueue(codec, connectionManager.getCommandExecutor(), name, this); } - + @Override public RPriorityBlockingQueue getPriorityBlockingQueue(String name) { return new RedissonPriorityBlockingQueue(connectionManager.getCommandExecutor(), name, this); } - + @Override public RPriorityBlockingQueue getPriorityBlockingQueue(String name, Codec codec) { return new RedissonPriorityBlockingQueue(codec, connectionManager.getCommandExecutor(), name, this); @@ -692,13 +692,13 @@ public class Redisson implements RedissonClient { public RPriorityBlockingDeque getPriorityBlockingDeque(String name) { return new RedissonPriorityBlockingDeque(connectionManager.getCommandExecutor(), name, this); } - + @Override public RPriorityBlockingDeque getPriorityBlockingDeque(String name, Codec codec) { return new RedissonPriorityBlockingDeque(codec, connectionManager.getCommandExecutor(), name, this); } - + @Override public RPriorityDeque getPriorityDeque(String name) { return new RedissonPriorityDeque(connectionManager.getCommandExecutor(), name, this); @@ -708,7 +708,6 @@ public class Redisson implements RedissonClient { public RPriorityDeque getPriorityDeque(String name, Codec codec) { return new RedissonPriorityDeque(codec, connectionManager.getCommandExecutor(), name, this); } - -} +} diff --git a/redisson/src/main/java/org/redisson/RedissonReactive.java b/redisson/src/main/java/org/redisson/RedissonReactive.java index da2f99049..4a658bb94 100644 --- a/redisson/src/main/java/org/redisson/RedissonReactive.java +++ b/redisson/src/main/java/org/redisson/RedissonReactive.java @@ -108,14 +108,14 @@ public class RedissonReactive implements RedissonReactiveClient { protected final ConnectionManager connectionManager; protected final Config config; protected final ReferenceCodecProvider codecProvider; - + protected final UUID id = UUID.randomUUID(); protected final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(); - + protected RedissonReactive(Config config) { this.config = config; Config configCopy = new Config(config); - + connectionManager = ConfigSupport.createConnectionManager(configCopy); commandExecutor = new CommandReactiveService(connectionManager); evictionScheduler = new EvictionScheduler(commandExecutor); @@ -146,17 +146,17 @@ public class RedissonReactive implements RedissonReactiveClient { public RSemaphoreReactive getSemaphore(String name) { return new RedissonSemaphoreReactive(commandExecutor, name, semaphorePubSub); } - + @Override public RPermitExpirableSemaphoreReactive getPermitExpirableSemaphore(String name) { - return new RedissonPermitExpirableSemaphoreReactive(commandExecutor, name, semaphorePubSub); + return new RedissonPermitExpirableSemaphoreReactive(commandExecutor, name, semaphorePubSub); } - + @Override public RReadWriteLockReactive getReadWriteLock(String name) { return new RedissonReadWriteLockReactive(commandExecutor, name); } - + @Override public RLockReactive getLock(String name) { return new RedissonLockReactive(commandExecutor, name); @@ -196,8 +196,8 @@ public class RedissonReactive implements RedissonReactiveClient { return buckets; } - - + + @Override public RHyperLogLogReactive getHyperLogLog(String name) { return new RedissonHyperLogLogReactive(commandExecutor, name); @@ -222,7 +222,7 @@ public class RedissonReactive implements RedissonReactiveClient { public RListMultimapReactive getListMultimap(String name) { return new RedissonListMultimapReactive(id, commandExecutor, name); } - + @Override public RListMultimapReactive getListMultimap(String name, Codec codec) { return new RedissonListMultimapReactive(id, codec, commandExecutor, name); @@ -232,12 +232,12 @@ public class RedissonReactive implements RedissonReactiveClient { public RSetMultimapReactive getSetMultimap(String name) { return new RedissonSetMultimapReactive(id, commandExecutor, name); } - + @Override public RSetMultimapReactive getSetMultimap(String name, Codec codec) { return new RedissonSetMultimapReactive(id, codec, commandExecutor, name); } - + @Override public RMapReactive getMap(String name) { return new RedissonMapReactive(commandExecutor, name, null); @@ -337,7 +337,7 @@ public class RedissonReactive implements RedissonReactiveClient { public RAtomicLongReactive getAtomicLong(String name) { return new RedissonAtomicLongReactive(commandExecutor, name); } - + @Override public RAtomicDoubleReactive getAtomicDouble(String name) { return new RedissonAtomicDoubleReactive(commandExecutor, name); @@ -352,7 +352,7 @@ public class RedissonReactive implements RedissonReactiveClient { public RScriptReactive getScript() { return new RedissonScriptReactive(commandExecutor); } - + @Override public RBatchReactive createBatch(BatchOptions options) { RedissonBatchReactive batch = new RedissonBatchReactive(evictionScheduler, connectionManager, options); @@ -381,7 +381,7 @@ public class RedissonReactive implements RedissonReactiveClient { public ReferenceCodecProvider getCodecProvider() { return codecProvider; } - + @Override public NodesGroup getNodesGroup() { return new RedisNodes(connectionManager); @@ -442,4 +442,3 @@ public class RedissonReactive implements RedissonReactiveClient { return new RedissonTransactionReactive(commandExecutor, options); } } - diff --git a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java index 5cf491ec4..975a8282f 100644 --- a/redisson/src/main/java/org/redisson/command/CommandAsyncService.java +++ b/redisson/src/main/java/org/redisson/command/CommandAsyncService.java @@ -55,6 +55,8 @@ import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.ScoredEntry; import org.redisson.client.protocol.decoder.ListScanResult; import org.redisson.client.protocol.decoder.MapScanResult; +import org.redisson.codec.ReferenceCodecProvider; +import org.redisson.config.Config; import org.redisson.config.MasterSlaveServersConfig; import org.redisson.connection.ConnectionManager; import org.redisson.connection.MasterSlaveEntry; @@ -102,6 +104,7 @@ public class CommandAsyncService implements CommandAsyncExecutor { public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonClient redisson) { if (redisson != null) { this.redisson = redisson; + enableRedissonReferenceSupport(redisson.getConfig()); this.redissonReactive = null; } return this; @@ -111,11 +114,18 @@ public class CommandAsyncService implements CommandAsyncExecutor { public CommandAsyncExecutor enableRedissonReferenceSupport(RedissonReactiveClient redissonReactive) { if (redissonReactive != null) { this.redissonReactive = redissonReactive; + enableRedissonReferenceSupport(redissonReactive.getConfig()); this.redisson = null; } return this; } + private void enableRedissonReferenceSupport(Config config) { + Codec codec = config.getCodec(); + ReferenceCodecProvider codecProvider = config.getReferenceCodecProvider(); + codecProvider.registerCodec((Class) codec.getClass(), codec); + } + @Override public boolean isRedissonReferenceSupportEnabled() { return redisson != null || redissonReactive != null; diff --git a/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java b/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java index db59e4802..cac728d8a 100644 --- a/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReferenceReactiveTest.java @@ -2,13 +2,13 @@ package org.redisson; import java.util.List; import static org.junit.Assert.*; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Test; -import org.redisson.api.BatchOptions; -import org.redisson.api.RBatch; -import org.redisson.api.RBatchReactive; -import org.redisson.api.RBucket; -import org.redisson.api.RBucketReactive; -import org.redisson.api.RedissonClient; +import org.redisson.api.*; +import org.redisson.codec.JsonJacksonCodec; +import org.redisson.config.Config; import org.redisson.reactive.RedissonBucketReactive; import org.redisson.reactive.RedissonMapCacheReactive; @@ -17,7 +17,7 @@ import org.redisson.reactive.RedissonMapCacheReactive; * @author Rui Gu (https://github.com/jackygurui) */ public class RedissonReferenceReactiveTest extends BaseReactiveTest { - + @Test public void test() throws InterruptedException { RBucketReactive b1 = redisson.getBucket("b1"); @@ -33,7 +33,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest { sync(((RedissonMapCacheReactive) sync(b4.get())).fastPut(b1, b2)); assertEquals("b2", ((RBucketReactive) sync(((RedissonMapCacheReactive) sync(b4.get())).get(b1))).getName()); } - + @Test public void testBatch() throws InterruptedException { RBatchReactive batch = redisson.createBatch(BatchOptions.defaults()); @@ -44,7 +44,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest { b1.set(b2); b3.set(b1); sync(batch.execute()); - + batch = redisson.createBatch(BatchOptions.defaults()); batch.getBucket("b1").get(); batch.getBucket("b2").get(); @@ -54,7 +54,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest { assertEquals("b3", result.get(1).getName()); assertEquals("b1", result.get(2).getName()); } - + @Test public void testReactiveToNormal() throws InterruptedException { RBatchReactive batch = redisson.createBatch(BatchOptions.defaults()); @@ -65,7 +65,7 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest { b1.set(b2); b3.set(b1); sync(batch.execute()); - + RedissonClient lredisson = Redisson.create(redisson.getConfig()); RBatch b = lredisson.createBatch(BatchOptions.defaults()); b.getBucket("b1").getAsync(); @@ -75,7 +75,54 @@ public class RedissonReferenceReactiveTest extends BaseReactiveTest { assertEquals("b2", result.get(0).getName()); assertEquals("b3", result.get(1).getName()); assertEquals("b1", result.get(2).getName()); - + lredisson.shutdown(); } + + @Test + public void shouldUseDefaultCodec() throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + objectMapper.configure(DeserializationFeature.UNWRAP_ROOT_VALUE, false); + JsonJacksonCodec codec = new JsonJacksonCodec(objectMapper); + + Config config = new Config(); + config.setCodec(codec); + config.useSingleServer() + .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + + RedissonReactiveClient reactive = Redisson.createReactive(config); + RBucketReactive b1 = reactive.getBucket("b1"); + sync(b1.set(new MyObject())); + RSetReactive s1 = reactive.getSet("s1"); + assertTrue(sync(s1.add(b1)) == 1); + assertTrue(codec == b1.getCodec()); + + Config config1 = new Config(); + config1.setCodec(codec); + config1.useSingleServer() + .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + RedissonReactiveClient reactive1 = Redisson.createReactive(config1); + + RSetReactive s2 = reactive1.getSet("s1"); + RBucketReactive b2 = sync(s2.iterator(1)); + assertTrue(codec == b2.getCodec()); + assertTrue(sync(b2.get()) instanceof MyObject); + reactive.shutdown(); + reactive1.shutdown(); + } + + public static class MyObject { + + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + } } diff --git a/redisson/src/test/java/org/redisson/RedissonReferenceTest.java b/redisson/src/test/java/org/redisson/RedissonReferenceTest.java index dc5397c20..38628334e 100644 --- a/redisson/src/test/java/org/redisson/RedissonReferenceTest.java +++ b/redisson/src/test/java/org/redisson/RedissonReferenceTest.java @@ -4,29 +4,14 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Test; -import org.redisson.api.LocalCachedMapOptions; -import org.redisson.api.RBatch; -import org.redisson.api.RBatchReactive; -import org.redisson.api.RBucket; -import org.redisson.api.RBucketAsync; -import org.redisson.api.RBucketReactive; -import org.redisson.api.RDelayedQueue; -import org.redisson.api.RGeo; -import org.redisson.api.RList; -import org.redisson.api.RListMultimap; -import org.redisson.api.RLiveObject; -import org.redisson.api.RLiveObjectService; -import org.redisson.api.RLocalCachedMap; -import org.redisson.api.RMap; -import org.redisson.api.RMapCache; -import org.redisson.api.RPriorityQueue; -import org.redisson.api.RQueue; -import org.redisson.api.RScoredSortedSet; -import org.redisson.api.RSet; -import org.redisson.api.RSetCache; -import org.redisson.api.RSetMultimap; +import org.redisson.api.*; import org.redisson.client.protocol.ScoredEntry; +import org.redisson.codec.JsonJacksonCodec; +import org.redisson.config.Config; /** * @@ -233,4 +218,52 @@ public class RedissonReferenceTest extends BaseTest { assertNotEquals(1, redisson.getKeys().count()); assertEquals(3, redisson.getKeys().count()); } + + + @Test + public void shouldUseDefaultCodec() throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + objectMapper.configure(DeserializationFeature.UNWRAP_ROOT_VALUE, false); + JsonJacksonCodec codec = new JsonJacksonCodec(objectMapper); + + Config config = new Config(); + config.setCodec(codec); + config.useSingleServer() + .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + + RedissonClient redissonClient = Redisson.create(config); + RBucket b1 = redissonClient.getBucket("b1"); + b1.set(new MyObject()); + RSet s1 = redissonClient.getSet("s1"); + assertTrue(s1.add(b1)); + assertTrue(codec == b1.getCodec()); + + Config config1 = new Config(); + config1.setCodec(codec); + config1.useSingleServer() + .setAddress(RedisRunner.getDefaultRedisServerBindAddressAndPort()); + RedissonClient redissonClient1 = Redisson.create(config1); + + RSet s2 = redissonClient1.getSet("s1"); + RBucket b2 = s2.iterator(1).next(); + assertTrue(codec == b2.getCodec()); + assertTrue(b2.get() instanceof MyObject); + redissonClient.shutdown(); + redissonClient1.shutdown(); + } + + public static class MyObject { + + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + } }